[ltt-dev] [UST PATCH] Re-write ustcomm parts of UST

Nils Carlson nils.carlson at ericsson.com
Fri Sep 24 09:14:34 EDT 2010


On Fri, 24 Sep 2010, David Goulet wrote:

> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA1
>
>
>
> On 10-09-24 02:42 AM, Nils Carlson wrote:
>> Hi David,
>>
>> On Thu, 23 Sep 2010, David Goulet wrote:
>>
>> Hey Nils,
>>
>> I'll check carefully this patch tomorrow but until that time we might
>> consider
>> this for a communication protocol. Since TCF is using JSON protocol,
>> we can use
>> that for UST. We have a LTTng and UST TCF agent so unifying everything
>> would be
>> ideal.
>>
>>> Beware, this is not an external communication protocol I've been working
>>> on. It's an internal one, intra-library. And as we build everything at
>>> once with the same headers etc. we can make all sorts of assumptions
>>> that wouldn't be valid in anything else.
>
> Yes I know that but how can this stop us from still using JSON?
>

It doesn't stop us, but there is no reason to other that that it might be 
easy. And easy isn't really a good reason in this case.

We are linking this code into applications that are expected to run for 
years at a time on soft-real-time systems. So we want to minimise 
processor usage (avoid all unnecessary string parsing). We want to 
minimise the number of memory allocations (to avoid leaks). We want to 
minimise the number of potential bugs in UST, ESPECIALLY in libust that 
links into the application.

This means that I want to avoid any and all non-glibc dependencies that I 
can. Linking in a whole library for message handling as a side-effect of 
linking in UST is not something most of my applications will accept.

Many applications also use isolcpu to gain an exclusive core for their 
processing needs. The libust thread will thus be competing for resources 
on a single core with a main process thread that is executing with 
SCHED_FIFO. This means that we can expect to be starved.

/Nils


> David
>
>>
>>> /Nils
>>
>>
>> As Mathieu suggest, and it was brought up at the CDT Summit this week,
>> is that
>> we should *not* be dependent on the TCF JSON library and consider
>> using the
>> libjson (http://oss.metaparadigm.com/json-c/) or our own implementation.
>>
>> Cheers
>> David
>>
>> On 10-09-23 06:13 AM, Nils Carlson wrote:
>>>>> This is a very big patch, and so it requires a bit of explaining.
>>>>>
>>>>> This patch is a step on the way of accomplishing serveral goals I
>>>>> have in this
>>>>> area:
>>>>>
>>>>> 1. Use enums for commands and eliminate text-based commands. This
>>>>> does not mean
>>>>>    that we will stop processing strings for trace/channel and marker
>>>>> names;
>>>>>    just that the long series of if statements with token and string
>>>>> matching
>>>>>    will be replaced with a switch statement. To this end I have
>>>>> created a
>>>>>    ustcomm_header struct that contains the length of the data-field
>>>>> and some
>>>>>    other fields. This allows us to first receive the header, allocate
>>>>> memory
>>>>>    for the data and then receive the data; eliminating all scanning
>>>>> of messages.
>>>>>
>>>>> 2. Reduce the complexity of the implementation. To put it simply, I
>>>>> don't like
>>>>>    callbacks. They reduce transparency and make it difficult to
>>>>> follow the
>>>>>    flow of the code; so I have eliminated multipoll replacing it with
>>>>> a normal
>>>>>    epoll. I have also replaced almost all the different server,
>>>>> connection and
>>>>>    source structs with one, called ustcomm_sock.
>>>>>
>>>>> 3. Make ustd scale better. Currently ustd scales terribly. We
>>>>> allocate one
>>>>>    thread per-cpu per-channel per-process, five applications each
>>>>> with three
>>>>>    channels on a four cpu machine leads to 5*3*4=60 threads. Part of
>>>>> the reason
>>>>>    for this multitude of threads was that we used a ustcomm_request call
>>>>>    (consisting of a send and a receive) to wait for a subbuffer to be
>>>>> written.
>>>>>    The sequence for a subbuffer to be written was as follows:
>>>>>
>>>>>       Ustd calls send with a 'get_subbuffer' command, and then recv
>>>>> in one of
>>>>>       the threads and hangs on the recv on the socket.
>>>>>
>>>>>       Upon filling the subbuffer the traced app writes '1' to a pipe.
>>>>>
>>>>>       The ust_thread inside the app which was listening to the other
>>>>> end of the
>>>>>       pipe wakes up when the '1' is written. The callback from
>>>>> multipoll calls
>>>>>       a send which sends a reply to the ustd thread over the socket.
>>>>>
>>>>>       The ustd thread wakes up and reads the message, continuing
>>>>> along in its
>>>>>       execution.
>>>>>
>>>>>    I replace this with a bit of a different mechanism, which should
>>>>> allow us
>>>>>    to eventually reduce the number of threads to one per cpu:
>>>>>
>>>>>       Ustd requests a buffer_fd which causes the ustd_thread inside
>>>>> the app
>>>>>       to send the file-descriptor for the read en of the pipe to ustd.
>>>>>
>>>>>       The ustd thread now does a read on the pipe, halting its
>>>>> execution until
>>>>>       the app fills the subbuffer and writes '1' to the pipe, waking
>>>>> up the ustd
>>>>>       thread.
>>>>>
>>>>>       Ustd now makes the 'get_subbuffer' call which the ust_thread
>>>>> inside the
>>>>>       app responds to with information about the subbuffer. Writes it
>>>>> and then
>>>>>       goes back to the read call, hanging on the pipe.
>>>>>
>>>>>    So we are still stuck on the multitude of threads, but we are in
>>>>> much better
>>>>>    position to move forward. Replacing the read with an epoll
>>>>> statement and then
>>>>>    pointing the epoll event data at the buffer struct containing the
>>>>> current
>>>>>    buffer to whitch the pipe belongs should be relatively easy. We
>>>>> can then
>>>>>    instead of spawning a new thread for each buffer just allocate the
>>>>>    buffer_info struct and assign it to one of the per-cpu threads in
>>>>> ustd to
>>>>>    poll on.
>>>>>
>>>>> 4. Replace poll with epoll which scales better, especially for
>>>>>    events << (nr of fds). This is complete.
>>>>>
>>>>> 5. Allow UST to handle arbitrarily long unix socket names. This is
>>>>> done by
>>>>>    carefull allocation of the socketaddr_un struct with a dynamic
>>>>> length.
>>>>>    Truncating is ugly and dangerous.
>>>>>
>>>>> There is a lot of work still left to be done. This is only the first
>>>>> of a
>>>>> number of patches that I expect in this area. If someone feels like
>>>>> tackling
>>>>> ustd head on to reduce the number of threads that would be great.
>>>>>
>>>>> I have kept Pierre-Marc's form of error handling for the I/O wrapping
>>>>> functions
>>>>> because I want to propagate return codes up to the apps that are
>>>>> using them
>>>>> so they can close file-descriptors and free associated resources. If
>>>>> somebody
>>>>> knows of a better approach please make yourself heard.
>>>>>
>>>>> Signed-off-by: Nils Carlson <nils.carlson at ericsson.com>
>>>>> ---
>>>>>  include/ust/ustd.h     |   12 +-
>>>>>  libust/buffers.h       |    5 +
>>>>>  libust/tracectl.c      |  474 ++++++++++++++----------
>>>>>  libustcmd/ustcmd.c     |   15 +-
>>>>>  libustcomm/Makefile.am |    5 +-
>>>>>  libustcomm/multipoll.c |  130 -------
>>>>>  libustcomm/multipoll.h |   44 ---
>>>>>  libustcomm/ustcomm.c   |  969
>>>>> +++++++++++++++++++-----------------------------
>>>>>  libustcomm/ustcomm.h   |   83 ++---
>>>>>  libustd/libustd.c      |  277 ++++++++++-----
>>>>>  10 files changed, 902 insertions(+), 1112 deletions(-)
>>>>>  delete mode 100644 libustcomm/multipoll.c
>>>>>  delete mode 100644 libustcomm/multipoll.h
>>>>>
>>>>> diff --git a/include/ust/ustd.h b/include/ust/ustd.h
>>>>> index 5fec7f9..7ce063f 100644
>>>>> --- a/include/ust/ustd.h
>>>>> +++ b/include/ust/ustd.h
>>>>> @@ -29,16 +29,18 @@
>>>>>  #include <pthread.h>
>>>>>  #include <dirent.h>
>>>>>  #include <ust/kcompat/kcompat.h>
>>>>> +#include <urcu/list.h>
>>>>>
>>>>>  #define USTD_DEFAULT_TRACE_PATH "/tmp/usttrace"
>>>>>
>>>>> -struct ustcomm_connection;
>>>>> -struct ustcomm_ustd;
>>>>> +struct ustcomm_sock;
>>>>>
>>>>>  struct buffer_info {
>>>>>       const char *name;
>>>>>       pid_t pid;
>>>>> -     struct ustcomm_connection *conn;
>>>>> +     int app_sock;
>>>>> +     /* The pipe file descriptor */
>>>>> +     int pipe_fd;
>>>>>
>>>>>       int shmid;
>>>>>       int bufstruct_shmid;
>>>>> @@ -73,7 +75,9 @@ struct libustd_instance {
>>>>>       struct libustd_callbacks *callbacks;
>>>>>       int quit_program;
>>>>>       int is_init;
>>>>> -     struct ustcomm_ustd *comm;
>>>>> +     struct list_head connections;
>>>>> +     int epoll_fd;
>>>>> +     struct ustcomm_sock *listen_sock;
>>>>>       char *sock_path;
>>>>>       pthread_mutex_t mutex;
>>>>>       int active_buffers;
>>>>> diff --git a/libust/buffers.h b/libust/buffers.h
>>>>> index 3044500..a2ad83e 100644
>>>>> --- a/libust/buffers.h
>>>>> +++ b/libust/buffers.h
>>>>> @@ -82,6 +82,11 @@ struct ust_buffer {
>>>>>       int data_ready_fd_write;
>>>>>       /* the reading end of the pipe */
>>>>>       int data_ready_fd_read;
>>>>> +     /*
>>>>> +      * List of buffers with an open pipe, used for fork and forced
>>>>> subbuffer
>>>>> +      * switch.
>>>>> +      */
>>>>> +     struct list_head open_buffers_list;
>>>>>
>>>>>       unsigned int finalized;
>>>>>  //ust//      struct timer_list switch_timer; /* timer for periodical
>>>>> switch */
>>>>> diff --git a/libust/tracectl.c b/libust/tracectl.c
>>>>> index 60c375b..3472ca9 100644
>>>>> --- a/libust/tracectl.c
>>>>> +++ b/libust/tracectl.c
>>>>> @@ -26,13 +26,15 @@
>>>>>  #include <stdint.h>
>>>>>  #include <pthread.h>
>>>>>  #include <signal.h>
>>>>> +#include <sys/epoll.h>
>>>>> +#include <sys/time.h>
>>>>>  #include <sys/types.h>
>>>>>  #include <sys/socket.h>
>>>>> -#include <sys/un.h>
>>>>>  #include <fcntl.h>
>>>>>  #include <poll.h>
>>>>>  #include <regex.h>
>>>>>  #include <urcu/uatomic_arch.h>
>>>>> +#include <urcu/list.h>
>>>>>
>>>>>  #include <ust/marker.h>
>>>>>  #include <ust/tracepoint.h>
>>>>> @@ -42,7 +44,6 @@
>>>>>  #include "ustcomm.h"
>>>>>  #include "buffers.h"
>>>>>  #include "marker-control.h"
>>>>> -#include "multipoll.h"
>>>>>
>>>>>  #define USTSIGNAL SIGIO
>>>>>
>>>>> @@ -55,50 +56,18 @@
>>>>>   */
>>>>>  s64 pidunique = -1LL;
>>>>>
>>>>> -extern struct chan_info_struct chan_infos[];
>>>>> +static int epoll_fd;
>>>>> +static struct ustcomm_sock *listen_sock;
>>>>>
>>>>> -struct list_head blocked_consumers = LIST_HEAD_INIT(blocked_consumers);
>>>>> +extern struct chan_info_struct chan_infos[];
>>>>>
>>>>> -static struct ustcomm_app ustcomm_app;
>>>>> +static struct list_head open_buffers_list =
>>>>> LIST_HEAD_INIT(open_buffers_list);
>>>>>
>>>>> -struct tracecmd { /* no padding */
>>>>> -     uint32_t size;
>>>>> -     uint16_t command;
>>>>> -};
>>>>> +static struct list_head ust_socks = LIST_HEAD_INIT(ust_socks);
>>>>>
>>>>>  /* volatile because shared between the listener and the main thread */
>>>>>  int buffers_to_export = 0;
>>>>>
>>>>> -struct trctl_msg {
>>>>> -     /* size: the size of all the fields except size itself */
>>>>> -     uint32_t size;
>>>>> -     uint16_t type;
>>>>> -     /* Only the necessary part of the payload is transferred. It
>>>>> -         * may even be none of it.
>>>>> -         */
>>>>> -     char payload[94];
>>>>> -};
>>>>> -
>>>>> -struct consumer_channel {
>>>>> -     int fd;
>>>>> -     struct ltt_channel_struct *chan;
>>>>> -};
>>>>> -
>>>>> -struct blocked_consumer {
>>>>> -     int fd_consumer;
>>>>> -     int fd_producer;
>>>>> -     int tmp_poll_idx;
>>>>> -
>>>>> -     /* args to ustcomm_send_reply */
>>>>> -     struct ustcomm_server server;
>>>>> -     struct ustcomm_source src;
>>>>> -
>>>>> -     /* args to ust_buffers_get_subbuf */
>>>>> -     struct ust_buffer *buf;
>>>>> -
>>>>> -     struct list_head list;
>>>>> -};
>>>>> -
>>>>>  static long long make_pidunique(void)
>>>>>  {
>>>>>       s64 retval;
>>>>> @@ -122,7 +91,12 @@ static void print_markers(FILE *fp)
>>>>>       marker_iter_start(&iter);
>>>>>
>>>>>       while (iter.marker) {
>>>>> -             fprintf(fp, "marker: %s/%s %d \"%s\" %p\n",
>>>>> iter.marker->channel, iter.marker->name,
>>>>> (int)imv_read(iter.marker->state), iter.marker->format,
>>>>> iter.marker->location);
>>>>> +             fprintf(fp, "marker: %s/%s %d \"%s\" %p\n",
>>>>> +                     iter.marker->channel,
>>>>> +                     iter.marker->name,
>>>>> +                     (int)imv_read(iter.marker->state),
>>>>> +                     iter.marker->format,
>>>>> +                     iter.marker->location);
>>>>>               marker_iter_next(&iter);
>>>>>       }
>>>>>       unlock_markers();
>>>>> @@ -143,8 +117,6 @@ static void print_trace_events(FILE *fp)
>>>>>       unlock_trace_events();
>>>>>  }
>>>>>
>>>>> -static int init_socket(void);
>>>>> -
>>>>>  /* Ask the daemon to collect a trace called trace_name and being
>>>>>   * produced by this pid.
>>>>>   *
>>>>> @@ -179,7 +151,8 @@ static void inform_consumer_daemon(const char
>>>>> *trace_name)
>>>>>                               }
>>>>>                               result = ustcomm_request_consumer(pid,
>>>>> buf);
>>>>>                               if (result == -1) {
>>>>> -                                     WARN("Failed to request
>>>>> collection for channel %s. Is the daemon available?",
>>>>> trace->channels[i].channel_name);
>>>>> +                                     WARN("Failed to request
>>>>> collection for channel %s. Is the daemon available?",
>>>>> +
>>>>> trace->channels[i].channel_name);
>>>>>                                       /* continue even if fail */
>>>>>                               }
>>>>>                               free(buf);
>>>>> @@ -192,74 +165,6 @@ static void inform_consumer_daemon(const char
>>>>> *trace_name)
>>>>>       ltt_unlock_traces();
>>>>>  }
>>>>>
>>>>> -int process_blkd_consumer_act(void *priv, int fd, short events)
>>>>> -{
>>>>> -     int result;
>>>>> -     long consumed_old = 0;
>>>>> -     char *reply;
>>>>> -     struct blocked_consumer *bc = (struct blocked_consumer *) priv;
>>>>> -     char inbuf;
>>>>> -
>>>>> -     result = read(bc->fd_producer, &inbuf, 1);
>>>>> -     if (result == -1) {
>>>>> -             PERROR("read");
>>>>> -             return -1;
>>>>> -     }
>>>>> -     if (result == 0) {
>>>>> -             int res;
>>>>> -             DBG("listener: got messsage that a buffer ended");
>>>>> -
>>>>> -             res = close(bc->fd_producer);
>>>>> -             if (res == -1) {
>>>>> -                     PERROR("close");
>>>>> -             }
>>>>> -
>>>>> -             list_del(&bc->list);
>>>>> -
>>>>> -             result = ustcomm_send_reply(&bc->server, "END", &bc->src);
>>>>> -             if (result < 0) {
>>>>> -                     ERR("ustcomm_send_reply failed");
>>>>> -                     return -1;
>>>>> -             }
>>>>> -
>>>>> -             return 0;
>>>>> -     }
>>>>> -
>>>>> -     result = ust_buffers_get_subbuf(bc->buf, &consumed_old);
>>>>> -     if (result == -EAGAIN) {
>>>>> -             WARN("missed buffer?");
>>>>> -             return 0;
>>>>> -     } else if (result < 0) {
>>>>> -             ERR("ust_buffers_get_subbuf: error: %s",
>>>>> strerror(-result));
>>>>> -     }
>>>>> -     if (asprintf(&reply, "%s %ld", "OK", consumed_old) < 0) {
>>>>> -               ERR("process_blkd_consumer_act : asprintf failed (OK
>>>>> %ld)",
>>>>> -                consumed_old);
>>>>> -               return -1;
>>>>> -     }
>>>>> -     result = ustcomm_send_reply(&bc->server, reply, &bc->src);
>>>>> -     if (result < 0) {
>>>>> -             ERR("ustcomm_send_reply failed");
>>>>> -             free(reply);
>>>>> -             return -1;
>>>>> -     }
>>>>> -     free(reply);
>>>>> -
>>>>> -     list_del(&bc->list);
>>>>> -
>>>>> -     return 0;
>>>>> -}
>>>>> -
>>>>> -void blocked_consumers_add_to_mp(struct mpentries *ent)
>>>>> -{
>>>>> -     struct blocked_consumer *bc;
>>>>> -
>>>>> -     list_for_each_entry(bc, &blocked_consumers, list) {
>>>>> -             multipoll_add(ent, bc->fd_producer, POLLIN,
>>>>> process_blkd_consumer_act, bc, NULL);
>>>>> -     }
>>>>> -
>>>>> -}
>>>>> -
>>>>>  void seperate_channel_cpu(const char *channel_and_cpu, char
>>>>> **channel, int *cpu)
>>>>>  {
>>>>>       const char *sep;
>>>>> @@ -279,7 +184,7 @@ void seperate_channel_cpu(const char
>>>>> *channel_and_cpu, char **channel, int *cpu)
>>>>>       }
>>>>>  }
>>>>>
>>>>> -static int do_cmd_get_shmid(const char *recvbuf, struct
>>>>> ustcomm_source *src)
>>>>> +static int do_cmd_get_shmid(const char *recvbuf, int sock)
>>>>>  {
>>>>>       int retval = 0;
>>>>>       struct ust_trace *trace;
>>>>> @@ -333,7 +238,7 @@ static int do_cmd_get_shmid(const char *recvbuf,
>>>>> struct ustcomm_source *src)
>>>>>                               goto free_short_chan_name;
>>>>>                       }
>>>>>
>>>>> -                     result =
>>>>> ustcomm_send_reply(&ustcomm_app.server, reply, src);
>>>>> +                     result = ustcomm_send_reply(reply, sock);
>>>>>                       if (result) {
>>>>>                               ERR("ustcomm_send_reply failed");
>>>>>                               free(reply);
>>>>> @@ -359,7 +264,7 @@ static int do_cmd_get_shmid(const char *recvbuf,
>>>>> struct ustcomm_source *src)
>>>>>       return retval;
>>>>>  }
>>>>>
>>>>> -static int do_cmd_get_n_subbufs(const char *recvbuf, struct
>>>>> ustcomm_source *src)
>>>>> +static int do_cmd_get_n_subbufs(const char *recvbuf, int sock)
>>>>>  {
>>>>>       int retval = 0;
>>>>>       struct ust_trace *trace;
>>>>> @@ -411,7 +316,7 @@ static int do_cmd_get_n_subbufs(const char
>>>>> *recvbuf, struct ustcomm_source *src)
>>>>>                               goto free_short_chan_name;
>>>>>                       }
>>>>>
>>>>> -                     result =
>>>>> ustcomm_send_reply(&ustcomm_app.server, reply, src);
>>>>> +                     result = ustcomm_send_reply(reply, sock);
>>>>>                       if (result) {
>>>>>                               ERR("ustcomm_send_reply failed");
>>>>>                               free(reply);
>>>>> @@ -435,7 +340,7 @@ static int do_cmd_get_n_subbufs(const char
>>>>> *recvbuf, struct ustcomm_source *src)
>>>>>       return retval;
>>>>>  }
>>>>>
>>>>> -static int do_cmd_get_subbuf_size(const char *recvbuf, struct
>>>>> ustcomm_source *src)
>>>>> +static int do_cmd_get_subbuf_size(const char *recvbuf, int sock)
>>>>>  {
>>>>>       int retval = 0;
>>>>>       struct ust_trace *trace;
>>>>> @@ -487,7 +392,7 @@ static int do_cmd_get_subbuf_size(const char
>>>>> *recvbuf, struct ustcomm_source *sr
>>>>>                               goto free_short_chan_name;
>>>>>                       }
>>>>>
>>>>> -                     result =
>>>>> ustcomm_send_reply(&ustcomm_app.server, reply, src);
>>>>> +                     result = ustcomm_send_reply(reply, sock);
>>>>>                       if (result) {
>>>>>                               ERR("ustcomm_send_reply failed");
>>>>>                               free(reply);
>>>>> @@ -524,7 +429,7 @@ static unsigned int pow2_higher_or_eq(unsigned
>>>>> int v)
>>>>>               return retval<<1;
>>>>>  }
>>>>>
>>>>> -static int do_cmd_set_subbuf_size(const char *recvbuf, struct
>>>>> ustcomm_source *src)
>>>>> +static int do_cmd_set_subbuf_size(const char *recvbuf, int sock)
>>>>>  {
>>>>>       char *channel_slash_size;
>>>>>       char *ch_name = NULL;
>>>>> @@ -581,7 +486,7 @@ static int do_cmd_set_subbuf_size(const char
>>>>> *recvbuf, struct ustcomm_source *sr
>>>>>       return retval;
>>>>>  }
>>>>>
>>>>> -static int do_cmd_set_subbuf_num(const char *recvbuf, struct
>>>>> ustcomm_source *src)
>>>>> +static int do_cmd_set_subbuf_num(const char *recvbuf, int sock)
>>>>>  {
>>>>>       char *channel_slash_num;
>>>>>       char *ch_name = NULL;
>>>>> @@ -638,7 +543,102 @@ static int do_cmd_set_subbuf_num(const char
>>>>> *recvbuf, struct ustcomm_source *src
>>>>>       return retval;
>>>>>  }
>>>>>
>>>>> -static int do_cmd_get_subbuffer(const char *recvbuf, struct
>>>>> ustcomm_source *src)
>>>>> +static int do_cmd_get_subbuffer(const char *recvbuf, int sock)
>>>>> +{
>>>>> +     int retval = 0, found = 0;;
>>>>> +     int i, ch_cpu, result;
>>>>> +     long consumed_old = 0;
>>>>> +     struct ust_trace *trace;
>>>>> +     char trace_name[] = "auto";
>>>>> +     char *channel_and_cpu;
>>>>> +     char *ch_name;
>>>>> +
>>>>> +     DBG("get_subbuf");
>>>>> +
>>>>> +     channel_and_cpu = nth_token(recvbuf, 1);
>>>>> +     if(channel_and_cpu == NULL) {
>>>>> +             ERR("cannot parse channel");
>>>>> +             retval = -1;
>>>>> +             goto end;
>>>>> +     }
>>>>> +
>>>>> +     seperate_channel_cpu(channel_and_cpu, &ch_name, &ch_cpu);
>>>>> +     if(ch_cpu == -1) {
>>>>> +             ERR("problem parsing channel name");
>>>>> +             retval = -1;
>>>>> +             goto free_short_chan_name;
>>>>> +     }
>>>>> +
>>>>> +     ltt_lock_traces();
>>>>> +     trace = _ltt_trace_find(trace_name);
>>>>> +
>>>>> +     if(trace == NULL) {
>>>>> +             int result;
>>>>> +
>>>>> +             DBG("Cannot find trace. It was likely destroyed by the
>>>>> user.");
>>>>> +             result = ustcomm_send_reply("NOTFOUND", sock);
>>>>> +             if(result) {
>>>>> +                     ERR("ustcomm_send_reply failed");
>>>>> +                     retval = -1;
>>>>> +                     goto unlock_traces;
>>>>> +             }
>>>>> +
>>>>> +             goto unlock_traces;
>>>>> +     }
>>>>> +
>>>>> +     for(i=0; i<trace->nr_channels; i++) {
>>>>> +             struct ust_channel *channel = &trace->channels[i];
>>>>> +
>>>>> +             if(!strcmp(trace->channels[i].channel_name, ch_name)) {
>>>>> +                     struct ust_buffer *buf = channel->buf[ch_cpu];
>>>>> +                     char *reply;
>>>>> +
>>>>> +                     found = 1;
>>>>> +
>>>>> +                     result = ust_buffers_get_subbuf(buf,
>>>>> &consumed_old);
>>>>> +                     if(result == -EAGAIN) {
>>>>> +                             WARN("missed buffer?");
>>>>> +                             return 0;
>>>>> +                     } else if (result < 0) {
>>>>> +                             ERR("ust_buffers_get_subbuf: error:
>>>>> %s", strerror(-result));
>>>>> +                     }
>>>>> +                     if (asprintf(&reply, "%s %ld", "OK",
>>>>> consumed_old) < 0) {
>>>>> +                             ERR("process_blkd_consumer_act :
>>>>> asprintf failed (OK %ld)",
>>>>> +                                 consumed_old);
>>>>> +                             return -1;
>>>>> +                     }
>>>>> +                     result = ustcomm_send_reply(reply, sock);
>>>>> +                     if (result < 0) {
>>>>> +                             ERR("ustcomm_send_reply failed");
>>>>> +                             free(reply);
>>>>> +                             return -1;
>>>>> +                     }
>>>>> +                     free(reply);
>>>>> +
>>>>> +                     break;
>>>>> +             }
>>>>> +     }
>>>>> +     if(found == 0) {
>>>>> +             result = ustcomm_send_reply("NOTFOUND", sock);
>>>>> +             if (result <= 0) {
>>>>> +                     ERR("ustcomm_send_reply failed");
>>>>> +                     return -1;
>>>>> +             }
>>>>> +             ERR("unable to find channel");
>>>>> +     }
>>>>> +
>>>>> +     unlock_traces:
>>>>> +     ltt_unlock_traces();
>>>>> +
>>>>> +     free_short_chan_name:
>>>>> +     free(ch_name);
>>>>> +
>>>>> +     end:
>>>>> +     return retval;
>>>>> +}
>>>>> +
>>>>> +
>>>>> +static int do_cmd_get_buffer_fd(const char *recvbuf, int sock)
>>>>>  {
>>>>>       int retval = 0;
>>>>>       struct ust_trace *trace;
>>>>> @@ -648,8 +648,9 @@ static int do_cmd_get_subbuffer(const char
>>>>> *recvbuf, struct ustcomm_source *src)
>>>>>       int found = 0;
>>>>>       char *ch_name;
>>>>>       int ch_cpu;
>>>>> +     struct ustcomm_header header;
>>>>>
>>>>> -     DBG("get_subbuf");
>>>>> +     DBG("get_buffer_fd");
>>>>>
>>>>>       channel_and_cpu = nth_token(recvbuf, 1);
>>>>>       if (channel_and_cpu == NULL) {
>>>>> @@ -672,7 +673,7 @@ static int do_cmd_get_subbuffer(const char
>>>>> *recvbuf, struct ustcomm_source *src)
>>>>>               int result;
>>>>>
>>>>>               DBG("Cannot find trace. It was likely destroyed by the
>>>>> user.");
>>>>> -             result = ustcomm_send_reply(&ustcomm_app.server,
>>>>> "NOTFOUND", src);
>>>>> +             result = ustcomm_send_reply("NOTFOUND", sock);
>>>>>               if (result) {
>>>>>                       ERR("ustcomm_send_reply failed");
>>>>>                       retval = -1;
>>>>> @@ -687,22 +688,16 @@ static int do_cmd_get_subbuffer(const char
>>>>> *recvbuf, struct ustcomm_source *src)
>>>>>
>>>>>               if (!strcmp(trace->channels[i].channel_name, ch_name)) {
>>>>>                       struct ust_buffer *buf = channel->buf[ch_cpu];
>>>>> -                     struct blocked_consumer *bc;
>>>>>
>>>>>                       found = 1;
>>>>>
>>>>> -                     bc = (struct blocked_consumer *)
>>>>> zmalloc(sizeof(struct blocked_consumer));
>>>>> -                     if (bc == NULL) {
>>>>> -                             ERR("zmalloc returned NULL");
>>>>> +                     header.size = 0;
>>>>> +                     header.fd_included = 1;
>>>>> +                     if (ustcomm_send_fd(sock, &header, NULL,
>>>>> +                                         &buf->data_ready_fd_read)
>>>>> <= 0) {
>>>>> +                             ERR("ustcomm_send_fd failed\n");
>>>>>                               goto unlock_traces;
>>>>>                       }
>>>>> -                     bc->fd_consumer = src->fd;
>>>>> -                     bc->fd_producer = buf->data_ready_fd_read;
>>>>> -                     bc->buf = buf;
>>>>> -                     bc->src = *src;
>>>>> -                     bc->server = ustcomm_app.server;
>>>>> -
>>>>> -                     list_add(&bc->list, &blocked_consumers);
>>>>>
>>>>>                       /* Being here is the proof the daemon has
>>>>> mapped the buffer in its
>>>>>                        * memory. We may now decrement buffers_to_export.
>>>>> @@ -712,6 +707,10 @@ static int do_cmd_get_subbuffer(const char
>>>>> *recvbuf, struct ustcomm_source *src)
>>>>>                               STORE_SHARED(buffers_to_export,
>>>>> LOAD_SHARED(buffers_to_export)-1);
>>>>>                       }
>>>>>
>>>>> +                     /* The buffer has been exported, ergo, we can
>>>>> add it to the
>>>>> +                      * list of open buffers
>>>>> +                      */
>>>>> +                     list_add(&buf->open_buffers_list,
>>>>> &open_buffers_list);
>>>>>                       break;
>>>>>               }
>>>>>       }
>>>>> @@ -729,7 +728,7 @@ static int do_cmd_get_subbuffer(const char
>>>>> *recvbuf, struct ustcomm_source *src)
>>>>>       return retval;
>>>>>  }
>>>>>
>>>>> -static int do_cmd_put_subbuffer(const char *recvbuf, struct
>>>>> ustcomm_source *src)
>>>>> +static int do_cmd_put_subbuffer(const char *recvbuf, int sock)
>>>>>  {
>>>>>       int retval = 0;
>>>>>       struct ust_trace *trace;
>>>>> @@ -779,7 +778,7 @@ static int do_cmd_put_subbuffer(const char
>>>>> *recvbuf, struct ustcomm_source *src)
>>>>>
>>>>>       if (trace == NULL) {
>>>>>               DBG("Cannot find trace. It was likely destroyed by the
>>>>> user.");
>>>>> -             result = ustcomm_send_reply(&ustcomm_app.server,
>>>>> "NOTFOUND", src);
>>>>> +             result = ustcomm_send_reply("NOTFOUND", sock);
>>>>>               if (result) {
>>>>>                       ERR("ustcomm_send_reply failed");
>>>>>                       retval = -1;
>>>>> @@ -814,7 +813,7 @@ static int do_cmd_put_subbuffer(const char
>>>>> *recvbuf, struct ustcomm_source *src)
>>>>>                               }
>>>>>                       }
>>>>>
>>>>> -                     result =
>>>>> ustcomm_send_reply(&ustcomm_app.server, reply, src);
>>>>> +                     result = ustcomm_send_reply(reply, sock);
>>>>>                       if (result) {
>>>>>                               ERR("ustcomm_send_reply failed");
>>>>>                               free(reply);
>>>>> @@ -845,26 +844,26 @@ static int do_cmd_put_subbuffer(const char
>>>>> *recvbuf, struct ustcomm_source *src)
>>>>>
>>>>>  static void listener_cleanup(void *ptr)
>>>>>  {
>>>>> -     ustcomm_fini_app(&ustcomm_app, 0);
>>>>> +     ustcomm_del_named_sock(listen_sock, 0);
>>>>>  }
>>>>>
>>>>>  static void do_cmd_force_switch()
>>>>>  {
>>>>> -     struct blocked_consumer *bc;
>>>>> +     struct ust_buffer *buf;
>>>>>
>>>>> -     list_for_each_entry(bc, &blocked_consumers, list) {
>>>>> -             ltt_force_switch(bc->buf, FORCE_FLUSH);
>>>>> +     list_for_each_entry(buf, &open_buffers_list,
>>>>> +                         open_buffers_list) {
>>>>> +             ltt_force_switch(buf, FORCE_FLUSH);
>>>>>       }
>>>>>  }
>>>>>
>>>>> -int process_client_cmd(char *recvbuf, struct ustcomm_source *src)
>>>>> +static int process_client_cmd(char *recvbuf, int sock)
>>>>>  {
>>>>>       int result;
>>>>>       char trace_name[] = "auto";
>>>>>       char trace_type[] = "ustrelay";
>>>>>       int len;
>>>>>
>>>>> -     DBG("received a message! it's: %s", recvbuf);
>>>>>       len = strlen(recvbuf);
>>>>>
>>>>>       if (!strcmp(recvbuf, "print_markers")) {
>>>>> @@ -878,7 +877,7 @@ int process_client_cmd(char *recvbuf, struct
>>>>> ustcomm_source *src)
>>>>>               print_markers(fp);
>>>>>               fclose(fp);
>>>>>
>>>>> -             result = ustcomm_send_reply(&ustcomm_app.server, ptr,
>>>>> src);
>>>>> +             result = ustcomm_send_reply(ptr, sock);
>>>>>
>>>>>               free(ptr);
>>>>>       } else if (!strcmp(recvbuf, "print_trace_events")) {
>>>>> @@ -897,7 +896,7 @@ int process_client_cmd(char *recvbuf, struct
>>>>> ustcomm_source *src)
>>>>>               print_trace_events(fp);
>>>>>               fclose(fp);
>>>>>
>>>>> -             result = ustcomm_send_reply(&ustcomm_app.server, ptr,
>>>>> src);
>>>>> +             result = ustcomm_send_reply(ptr, sock);
>>>>>               if (result < 0) {
>>>>>                       ERR("list_trace_events failed");
>>>>>                       return -1;
>>>>> @@ -1002,11 +1001,11 @@ int process_client_cmd(char *recvbuf, struct
>>>>> ustcomm_source *src)
>>>>>                       return -1;
>>>>>               }
>>>>>       } else if (nth_token_is(recvbuf, "get_shmid", 0) == 1) {
>>>>> -             do_cmd_get_shmid(recvbuf, src);
>>>>> +             do_cmd_get_shmid(recvbuf, sock);
>>>>>       } else if (nth_token_is(recvbuf, "get_n_subbufs", 0) == 1) {
>>>>> -             do_cmd_get_n_subbufs(recvbuf, src);
>>>>> +             do_cmd_get_n_subbufs(recvbuf, sock);
>>>>>       } else if (nth_token_is(recvbuf, "get_subbuf_size", 0) == 1) {
>>>>> -             do_cmd_get_subbuf_size(recvbuf, src);
>>>>> +             do_cmd_get_subbuf_size(recvbuf, sock);
>>>>>       } else if (nth_token_is(recvbuf, "load_probe_lib", 0) == 1) {
>>>>>               char *libfile;
>>>>>
>>>>> @@ -1016,13 +1015,17 @@ int process_client_cmd(char *recvbuf, struct
>>>>> ustcomm_source *src)
>>>>>
>>>>>               free(libfile);
>>>>>       } else if (nth_token_is(recvbuf, "get_subbuffer", 0) == 1) {
>>>>> -             do_cmd_get_subbuffer(recvbuf, src);
>>>>> -     } else if (nth_token_is(recvbuf, "put_subbuffer", 0) == 1) {
>>>>> -             do_cmd_put_subbuffer(recvbuf, src);
>>>>> +             do_cmd_get_subbuffer(recvbuf, sock);
>>>>> +     }
>>>>> +     else if(nth_token_is(recvbuf, "get_buffer_fd", 0) == 1) {
>>>>> +             do_cmd_get_buffer_fd(recvbuf, sock);
>>>>> +     }
>>>>> +     else if(nth_token_is(recvbuf, "put_subbuffer", 0) == 1) {
>>>>> +             do_cmd_put_subbuffer(recvbuf, sock);
>>>>>       } else if (nth_token_is(recvbuf, "set_subbuf_size", 0) == 1) {
>>>>> -             do_cmd_set_subbuf_size(recvbuf, src);
>>>>> +             do_cmd_set_subbuf_size(recvbuf, sock);
>>>>>       } else if (nth_token_is(recvbuf, "set_subbuf_num", 0) == 1) {
>>>>> -             do_cmd_set_subbuf_num(recvbuf, src);
>>>>> +             do_cmd_set_subbuf_num(recvbuf, sock);
>>>>>       } else if (nth_token_is(recvbuf, "enable_marker", 0) == 1) {
>>>>>               char *channel_slash_name = nth_token(recvbuf, 1);
>>>>>               char *channel_name = NULL;
>>>>> @@ -1074,7 +1077,7 @@ int process_client_cmd(char *recvbuf, struct
>>>>> ustcomm_source *src)
>>>>>                       goto next_cmd;
>>>>>               }
>>>>>
>>>>> -             result = ustcomm_send_reply(&ustcomm_app.server, reply,
>>>>> src);
>>>>> +             result = ustcomm_send_reply(reply, sock);
>>>>>               if (result) {
>>>>>                       ERR("listener: get_pidunique:
>>>>> ustcomm_send_reply failed");
>>>>>                       goto next_cmd;
>>>>> @@ -1089,10 +1092,10 @@ int process_client_cmd(char *recvbuf, struct
>>>>> ustcomm_source *src)
>>>>>                                   SOCK_DIR);
>>>>>                               goto next_cmd;
>>>>>                       }
>>>>> -                     result =
>>>>> ustcomm_send_reply(&ustcomm_app.server, reply, src);
>>>>> +                     result = ustcomm_send_reply(reply, sock);
>>>>>                       free(reply);
>>>>>               } else {
>>>>> -                     result =
>>>>> ustcomm_send_reply(&ustcomm_app.server, reply, src);
>>>>> +                     result = ustcomm_send_reply(reply, sock);
>>>>>               }
>>>>>               if (result)
>>>>>                       ERR("ustcomm_send_reply failed");
>>>>> @@ -1112,28 +1115,54 @@ next_cmd:
>>>>>       return 0;
>>>>>  }
>>>>>
>>>>> +
>>>>> +
>>>>> +
>>>>> +#define MAX_EVENTS 10
>>>>> +
>>>>> +
>>>>> +
>>>>>  void *listener_main(void *p)
>>>>>  {
>>>>> -     int result;
>>>>> +     struct ustcomm_sock *epoll_sock;
>>>>> +     struct epoll_event events[MAX_EVENTS];
>>>>> +     struct sockaddr addr;
>>>>> +     int accept_fd, nfds, result, i, addr_size;
>>>>>
>>>>>       DBG("LISTENER");
>>>>>
>>>>>       pthread_cleanup_push(listener_cleanup, NULL);
>>>>>
>>>>> -     for (;;) {
>>>>> -             struct mpentries mpent;
>>>>> -
>>>>> -             multipoll_init(&mpent);
>>>>> -
>>>>> -             blocked_consumers_add_to_mp(&mpent);
>>>>> -             ustcomm_mp_add_app_clients(&mpent, &ustcomm_app,
>>>>> process_client_cmd);
>>>>> -
>>>>> -             result = multipoll_poll(&mpent, -1);
>>>>> -             if (result == -1) {
>>>>> -                     ERR("error in multipoll_poll");
>>>>> +     for(;;) {
>>>>> +             nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
>>>>> +             if (nfds == -1) {
>>>>> +                     ERR("epoll_wait");
>>>>> +                     continue;
>>>>>               }
>>>>>
>>>>> -             multipoll_destroy(&mpent);
>>>>> +             for (i = 0; i < nfds; i++) {
>>>>> +                     epoll_sock = (struct ustcomm_sock
>>>>> *)events[i].data.ptr;
>>>>> +                     if (epoll_sock == listen_sock) {
>>>>> +                             addr_size = sizeof(struct sockaddr);
>>>>> +                             accept_fd = accept(epoll_sock->fd,
>>>>> +                                                &addr,
>>>>> +                                                (socklen_t
>>>>> *)&addr_size);
>>>>> +                             if (accept_fd == -1) {
>>>>> +                                     ERR("accept failed\n");
>>>>> +                             }
>>>>> +                             ustcomm_init_sock(accept_fd, epoll_fd,
>>>>> +                                              &ust_socks);
>>>>> +                     } else {
>>>>> +                             char *msg = NULL;
>>>>> +                             result =
>>>>> recv_message_conn(epoll_sock->fd, &msg);
>>>>> +                             if (result == 0) {
>>>>> +                                     ustcomm_del_sock(epoll_sock, 0);
>>>>> +                             } else if (msg) {
>>>>> +                                     process_client_cmd(msg,
>>>>> epoll_sock->fd);
>>>>> +                                     free(msg);
>>>>> +                             }
>>>>> +                     }
>>>>> +             }
>>>>>       }
>>>>>
>>>>>       pthread_cleanup_pop(1);
>>>>> @@ -1183,11 +1212,6 @@ void create_listener(void)
>>>>>       }
>>>>>  }
>>>>>
>>>>> -static int init_socket(void)
>>>>> -{
>>>>> -     return ustcomm_init_app(getpid(), &ustcomm_app);
>>>>> -}
>>>>> -
>>>>>  #define AUTOPROBE_DISABLED      0
>>>>>  #define AUTOPROBE_ENABLE_ALL    1
>>>>>  #define AUTOPROBE_ENABLE_REGEX  2
>>>>> @@ -1225,6 +1249,41 @@ static void auto_probe_connect(struct marker *m)
>>>>>
>>>>>  }
>>>>>
>>>>> +static struct ustcomm_sock * init_app_socket(int epoll_fd)
>>>>> +{
>>>>> +     char *name;
>>>>> +     int result;
>>>>> +     struct ustcomm_sock *sock;
>>>>> +
>>>>> +     result = asprintf(&name, "%s/%d", SOCK_DIR, (int)getpid());
>>>>> +     if (result < 0) {
>>>>> +             ERR("string overflow allocating socket name, "
>>>>> +                 "UST thread bailing");
>>>>> +             return NULL;
>>>>> +     }
>>>>> +
>>>>> +     result = ensure_dir_exists(SOCK_DIR);
>>>>> +     if (result == -1) {
>>>>> +             ERR("Unable to create socket directory %s, UST thread
>>>>> bailing",
>>>>> +                 SOCK_DIR);
>>>>> +             goto free_name;
>>>>> +     }
>>>>> +
>>>>> +     sock = ustcomm_init_named_socket(name, epoll_fd);
>>>>> +     if (!sock) {
>>>>> +             ERR("Error initializing named socket (%s). Check that
>>>>> directory"
>>>>> +                 "exists and that it is writable. UST thread
>>>>> bailing", name);
>>>>> +             goto free_name;
>>>>> +     }
>>>>> +
>>>>> +     free(name);
>>>>> +     return sock;
>>>>> +
>>>>> +free_name:
>>>>> +     free(name);
>>>>> +     return NULL;
>>>>> +}
>>>>> +
>>>>>  static void __attribute__((constructor)) init()
>>>>>  {
>>>>>       int result;
>>>>> @@ -1242,9 +1301,18 @@ static void __attribute__((constructor)) init()
>>>>>
>>>>>       DBG("Tracectl constructor");
>>>>>
>>>>> -     result = init_socket();
>>>>> -     if (result == -1) {
>>>>> -             ERR("init_socket error");
>>>>> +     /* Set up epoll */
>>>>> +     epoll_fd = epoll_create(MAX_EVENTS);
>>>>> +     if (epoll_fd == -1) {
>>>>> +             ERR("epoll_create failed, tracing shutting down");
>>>>> +             return;
>>>>> +     }
>>>>> +
>>>>> +     /* Create the socket */
>>>>> +     listen_sock = init_app_socket(epoll_fd);
>>>>> +     if (!listen_sock) {
>>>>> +             ERR("failed to create application socket,"
>>>>> +                 " tracing shutting down");
>>>>>               return;
>>>>>       }
>>>>>
>>>>> @@ -1451,13 +1519,6 @@ static int trace_recording(void)
>>>>>       return retval;
>>>>>  }
>>>>>
>>>>> -#if 0
>>>>> -static int have_consumer(void)
>>>>> -{
>>>>> -     return !list_empty(&blocked_consumers);
>>>>> -}
>>>>> -#endif
>>>>> -
>>>>>  int restarting_usleep(useconds_t usecs)
>>>>>  {
>>>>>          struct timespec tv;
>>>>> @@ -1545,8 +1606,8 @@ void ust_potential_exec(void)
>>>>>
>>>>>  static void ust_fork(void)
>>>>>  {
>>>>> -     struct blocked_consumer *bc;
>>>>> -     struct blocked_consumer *deletable_bc = NULL;
>>>>> +     struct ust_buffer *buf, *buf_tmp;
>>>>> +     struct ustcomm_sock *sock, *sock_tmp;
>>>>>       int result;
>>>>>
>>>>>       /* FIXME: technically, the locks could have been taken before
>>>>> the fork */
>>>>> @@ -1557,26 +1618,47 @@ static void ust_fork(void)
>>>>>
>>>>>       ltt_trace_stop("auto");
>>>>>       ltt_trace_destroy("auto", 1);
>>>>> -     /* Delete all active connections */
>>>>> -     ustcomm_close_all_connections(&ustcomm_app.server);
>>>>> +     /* Delete all active connections, but leave them in the epoll
>>>>> set */
>>>>> +     list_for_each_entry_safe(sock, sock_tmp, &ust_socks, list) {
>>>>> +             ustcomm_del_sock(sock, 1);
>>>>> +     }
>>>>>
>>>>>       /* Delete all blocked consumers */
>>>>> -     list_for_each_entry(bc, &blocked_consumers, list) {
>>>>> -             result = close(bc->fd_producer);
>>>>> -             if (result == -1) {
>>>>> +     list_for_each_entry_safe(buf, buf_tmp, &open_buffers_list,
>>>>> +                              open_buffers_list) {
>>>>> +             result = close(buf->data_ready_fd_read);
>>>>> +             if(result == -1) {
>>>>>                       PERROR("close");
>>>>>               }
>>>>> -             free(deletable_bc);
>>>>> -             deletable_bc = bc;
>>>>> -             list_del(&bc->list);
>>>>> +             result = close(buf->data_ready_fd_write);
>>>>> +             if(result == -1) {
>>>>> +                     PERROR("close");
>>>>> +             }
>>>>> +             list_del(&buf->open_buffers_list);
>>>>>       }
>>>>>
>>>>> -     /* free app, keeping socket file */
>>>>> -     ustcomm_fini_app(&ustcomm_app, 1);
>>>>> +     /* Clean up the listener socket and epoll, keeping the scoket
>>>>> file */
>>>>> +     ustcomm_del_named_sock(listen_sock, 1);
>>>>> +     close(epoll_fd);
>>>>>
>>>>> +     /* Re-start the launch sequence */
>>>>>       STORE_SHARED(buffers_to_export, 0);
>>>>>       have_listener = 0;
>>>>> -     init_socket();
>>>>> +
>>>>> +     /* Set up epoll */
>>>>> +     epoll_fd = epoll_create(MAX_EVENTS);
>>>>> +     if (epoll_fd == -1) {
>>>>> +             ERR("epoll_create failed, tracing shutting down");
>>>>> +             return;
>>>>> +     }
>>>>> +
>>>>> +     /* Create the socket */
>>>>> +     listen_sock = init_app_socket(epoll_fd);
>>>>> +     if (!listen_sock) {
>>>>> +             ERR("failed to create application socket,"
>>>>> +                 " tracing shutting down");
>>>>> +             return;
>>>>> +     }
>>>>>       create_listener();
>>>>>       ltt_trace_setup("auto");
>>>>>       result = ltt_trace_set_type("auto", "ustrelay");
>>>>> diff --git a/libustcmd/ustcmd.c b/libustcmd/ustcmd.c
>>>>> index c512320..ac90f6c 100644
>>>>> --- a/libustcmd/ustcmd.c
>>>>> +++ b/libustcmd/ustcmd.c
>>>>> @@ -52,7 +52,12 @@ pid_t *ustcmd_get_online_pids(void)
>>>>>                       !!strcmp(dirent->d_name, "ustd")) {
>>>>>
>>>>>                       sscanf(dirent->d_name, "%u", (unsigned int *)
>>>>> &ret[i]);
>>>>> -                     if (pid_is_online(ret[i])) {
>>>>> +                     /* FIXME: Here we previously called
>>>>> pid_is_online, which
>>>>> +                      * always returned 1, now I replaced it with
>>>>> just 1.
>>>>> +                      * We need to figure out an intelligent way of
>>>>> solving
>>>>> +                      * this, maybe connect-disconnect.
>>>>> +                      */
>>>>> +                     if (1) {
>>>>>                               ret_size += sizeof(pid_t);
>>>>>                               ret = (pid_t *) realloc(ret, ret_size);
>>>>>                               ++i;
>>>>> @@ -592,17 +597,17 @@ int ustcmd_force_switch(pid_t pid)
>>>>>
>>>>>  int ustcmd_send_cmd(const char *cmd, const pid_t pid, char **reply)
>>>>>  {
>>>>> -     struct ustcomm_connection conn;
>>>>> +     int app_fd;
>>>>>       int retval;
>>>>>
>>>>> -     if (ustcomm_connect_app(pid, &conn)) {
>>>>> +     if (ustcomm_connect_app(pid, &app_fd)) {
>>>>>               ERR("could not connect to PID %u", (unsigned int) pid);
>>>>>               return -1;
>>>>>       }
>>>>>
>>>>> -     retval = ustcomm_send_request(&conn, cmd, reply);
>>>>> +     retval = ustcomm_send_request(app_fd, cmd, reply);
>>>>>
>>>>> -     ustcomm_close_app(&conn);
>>>>> +     close(app_fd);
>>>>>
>>>>>       return retval;
>>>>>  }
>>>>> diff --git a/libustcomm/Makefile.am b/libustcomm/Makefile.am
>>>>> index 2672071..3ae96d5 100644
>>>>> --- a/libustcomm/Makefile.am
>>>>> +++ b/libustcomm/Makefile.am
>>>>> @@ -4,8 +4,7 @@ AM_CFLAGS = -fno-strict-aliasing
>>>>>  noinst_LTLIBRARIES = libustcomm.la
>>>>>  libustcomm_la_SOURCES = \
>>>>>       ustcomm.h \
>>>>> -     ustcomm.c \
>>>>> -     multipoll.h \
>>>>> -     multipoll.c
>>>>> +     ustcomm.c
>>>>> +
>>>>>  libustcomm_la_LDFLAGS = -no-undefined -static
>>>>>  libustcomm_la_CFLAGS = -DUST_COMPONENT="libustcomm" -fPIC
>>>>> -fno-strict-aliasing
>>>>> diff --git a/libustcomm/multipoll.c b/libustcomm/multipoll.c
>>>>> deleted file mode 100644
>>>>> index 80426e3..0000000
>>>>> --- a/libustcomm/multipoll.c
>>>>> +++ /dev/null
>>>>> @@ -1,130 +0,0 @@
>>>>> -/*
>>>>> - * multipoll.c
>>>>> - *
>>>>> - * Copyright (C) 2010 - Pierre-Marc Fournier (pierre-marc dot
>>>>> fournier at polymtl dot ca)
>>>>> - *
>>>>> - * This library is free software; you can redistribute it and/or
>>>>> - * modify it under the terms of the GNU Lesser General Public
>>>>> - * License as published by the Free Software Foundation; either
>>>>> - * version 2.1 of the License, or (at your option) any later version.
>>>>> - *
>>>>> - * This library is distributed in the hope that it will be useful,
>>>>> - * but WITHOUT ANY WARRANTY; without even the implied warranty of
>>>>> - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
>>>>> - * Lesser General Public License for more details.
>>>>> - *
>>>>> - * You should have received a copy of the GNU Lesser General Public
>>>>> - * License along with this library; if not, write to the Free Software
>>>>> - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
>>>>> 02110-1301 USA
>>>>> - */
>>>>> -
>>>>> -/* Multipoll is a framework to poll on several file descriptors and
>>>>> to call
>>>>> - * a specific callback depending on the fd that had activity.
>>>>> - */
>>>>> -
>>>>> -#include <poll.h>
>>>>> -#include <stdlib.h>
>>>>> -#include "multipoll.h"
>>>>> -#include "usterr.h"
>>>>> -
>>>>> -#define INITIAL_N_AVAIL 16
>>>>> -
>>>>> -/* multipoll_init
>>>>> - *
>>>>> - * Initialize an mpentries struct, which is initially empty of any fd.
>>>>> - */
>>>>> -
>>>>> -int multipoll_init(struct mpentries *ent)
>>>>> -{
>>>>> -     ent->n_used = 0;
>>>>> -     ent->n_avail = INITIAL_N_AVAIL;
>>>>> -
>>>>> -     ent->pollfds = (struct pollfd *) zmalloc(sizeof(struct pollfd)
>>>>> * INITIAL_N_AVAIL);
>>>>> -     ent->extras = (struct pollfd_extra *) zmalloc(sizeof(struct
>>>>> pollfd_extra) * INITIAL_N_AVAIL);
>>>>> -
>>>>> -     return 0;
>>>>> -}
>>>>> -
>>>>> -/* multipoll_destroy: free a struct mpentries
>>>>> - */
>>>>> -
>>>>> -int multipoll_destroy(struct mpentries *ent)
>>>>> -{
>>>>> -     int i;
>>>>> -
>>>>> -     for(i=0; i<ent->n_used; i++) {
>>>>> -             if(ent->extras[i].destroy_priv) {
>>>>> -                     ent->extras[i].destroy_priv(ent->extras[i].priv);
>>>>> -             }
>>>>> -     }
>>>>> -
>>>>> -     free(ent->pollfds);
>>>>> -     free(ent->extras);
>>>>> -
>>>>> -     return 0;
>>>>> -}
>>>>> -
>>>>> -/* multipoll_add
>>>>> - *
>>>>> - * Add a file descriptor to be waited on in a struct mpentries.
>>>>> - *
>>>>> - * @ent: the struct mpentries to add an fd to
>>>>> - * @fd: the fd to wait on
>>>>> - * @events: a mask of the types of events to wait on, see the
>>>>> poll(2) man page
>>>>> - * @func: the callback function to be called if there is activity on
>>>>> the fd
>>>>> - * @priv: the private pointer to pass to func
>>>>> - * @destroy_priv: a callback to destroy the priv pointer when the
>>>>> mpentries
>>>>> -                  is destroyed; may be NULL
>>>>> - */
>>>>> -
>>>>> -int multipoll_add(struct mpentries *ent, int fd, short events, int
>>>>> (*func)(void *priv, int fd, short events), void *priv, int
>>>>> (*destroy_priv)(void *))
>>>>> -{
>>>>> -     int cur;
>>>>> -
>>>>> -     if(ent->n_used == ent->n_avail) {
>>>>> -             ent->n_avail *= 2;
>>>>> -             ent->pollfds = (struct pollfd *) realloc(ent->pollfds,
>>>>> sizeof(struct pollfd) * ent->n_avail);
>>>>> -             ent->extras = (struct pollfd_extra *)
>>>>> realloc(ent->extras, sizeof(struct pollfd_extra) * ent->n_avail);
>>>>> -     }
>>>>> -
>>>>> -     cur = ent->n_used;
>>>>> -     ent->n_used++;
>>>>> -
>>>>> -     ent->pollfds[cur].fd = fd;
>>>>> -     ent->pollfds[cur].events = events;
>>>>> -     ent->extras[cur].func = func;
>>>>> -     ent->extras[cur].priv = priv;
>>>>> -     ent->extras[cur].destroy_priv = destroy_priv;
>>>>> -
>>>>> -     return 0;
>>>>> -}
>>>>> -
>>>>> -/* multipoll_poll: do the actual poll on a struct mpentries
>>>>> - *
>>>>> - * File descriptors should have been already added with
>>>>> multipoll_add().
>>>>> - *
>>>>> - * A struct mpentries may be reused for multiple multipoll_poll calls.
>>>>> - *
>>>>> - * @ent: the struct mpentries to poll on.
>>>>> - * @timeout: the timeout after which to return if there was no
>>>>> activity.
>>>>> - */
>>>>> -
>>>>> -int multipoll_poll(struct mpentries *ent, int timeout)
>>>>> -{
>>>>> -     int result;
>>>>> -     int i;
>>>>> -
>>>>> -     result = poll(ent->pollfds, ent->n_used, timeout);
>>>>> -     if(result == -1) {
>>>>> -             PERROR("poll");
>>>>> -             return -1;
>>>>> -     }
>>>>> -
>>>>> -     for(i=0; i<ent->n_used; i++) {
>>>>> -             if(ent->pollfds[i].revents) {
>>>>> -                     ent->extras[i].func(ent->extras[i].priv,
>>>>> ent->pollfds[i].fd, ent->pollfds[i].revents);
>>>>> -             }
>>>>> -     }
>>>>> -
>>>>> -     return 0;
>>>>> -}
>>>>> diff --git a/libustcomm/multipoll.h b/libustcomm/multipoll.h
>>>>> deleted file mode 100644
>>>>> index 8a0124f..0000000
>>>>> --- a/libustcomm/multipoll.h
>>>>> +++ /dev/null
>>>>> @@ -1,44 +0,0 @@
>>>>> -/*
>>>>> - * multipoll.h
>>>>> - *
>>>>> - * Copyright (C) 2010 - Pierre-Marc Fournier (pierre-marc dot
>>>>> fournier at polymtl dot ca)
>>>>> - *
>>>>> - * This library is free software; you can redistribute it and/or
>>>>> - * modify it under the terms of the GNU Lesser General Public
>>>>> - * License as published by the Free Software Foundation; either
>>>>> - * version 2.1 of the License, or (at your option) any later version.
>>>>> - *
>>>>> - * This library is distributed in the hope that it will be useful,
>>>>> - * but WITHOUT ANY WARRANTY; without even the implied warranty of
>>>>> - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
>>>>> - * Lesser General Public License for more details.
>>>>> - *
>>>>> - * You should have received a copy of the GNU Lesser General Public
>>>>> - * License along with this library; if not, write to the Free Software
>>>>> - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
>>>>> 02110-1301 USA
>>>>> - */
>>>>> -
>>>>> -#ifndef UST_MULTIPOLL_H
>>>>> -#define UST_MULTIPOLL_H
>>>>> -
>>>>> -struct pollfd_extra {
>>>>> -     int (*func)(void *priv, int fd, short events);
>>>>> -     void *priv;
>>>>> -
>>>>> -     int (*destroy_priv)(void *priv);
>>>>> -};
>>>>> -
>>>>> -struct mpentries {
>>>>> -     struct pollfd *pollfds;
>>>>> -     struct pollfd_extra *extras;
>>>>> -
>>>>> -     int n_used;
>>>>> -     int n_avail;
>>>>> -};
>>>>> -
>>>>> -extern int multipoll_init(struct mpentries *ent);
>>>>> -extern int multipoll_add(struct mpentries *ent, int fd, short
>>>>> events, int (*func)(void *priv, int fd, short events), void *priv,
>>>>> int (*destroy_priv)(void *));
>>>>> -extern int multipoll_destroy(struct mpentries *ent);
>>>>> -extern int multipoll_poll(struct mpentries *ent, int timeout);
>>>>> -
>>>>> -#endif /* UST_MULTIPOLL_H */
>>>>> diff --git a/libustcomm/ustcomm.c b/libustcomm/ustcomm.c
>>>>> index 567c5d1..d55ac4c 100644
>>>>> --- a/libustcomm/ustcomm.c
>>>>> +++ b/libustcomm/ustcomm.c
>>>>> @@ -25,6 +25,7 @@
>>>>>  #include <sys/un.h>
>>>>>  #include <unistd.h>
>>>>>  #include <poll.h>
>>>>> +#include <sys/epoll.h>
>>>>>  #include <sys/stat.h>
>>>>>
>>>>>  #include <stdio.h>
>>>>> @@ -35,9 +36,6 @@
>>>>>  #include "ustcomm.h"
>>>>>  #include "usterr.h"
>>>>>  #include "share.h"
>>>>> -#include "multipoll.h"
>>>>> -
>>>>> -#define UNIX_PATH_MAX 108
>>>>>
>>>>>  static int mkdir_p(const char *path, mode_t mode)
>>>>>  {
>>>>> @@ -91,430 +89,450 @@ static int mkdir_p(const char *path, mode_t mode)
>>>>>       return retval;
>>>>>  }
>>>>>
>>>>> -static int signal_process(pid_t pid)
>>>>> +static struct sockaddr_un * create_sock_addr(const char *name,
>>>>> +                                          size_t *sock_addr_size)
>>>>>  {
>>>>> -     return 0;
>>>>> -}
>>>>> +     struct sockaddr_un * addr;
>>>>> +     size_t alloc_size;
>>>>>
>>>>> -void ustcomm_init_connection(struct ustcomm_connection *conn)
>>>>> -{
>>>>> -     conn->recv_buf = NULL;
>>>>> -     conn->recv_buf_size = 0;
>>>>> -     conn->recv_buf_alloc = 0;
>>>>> -}
>>>>> +     alloc_size = (size_t) (((struct sockaddr_un *) 0)->sun_path) +
>>>>> +             strlen(name) + 1;
>>>>>
>>>>> -int pid_is_online(pid_t pid) {
>>>>> -     return 1;
>>>>> -}
>>>>> +     addr = malloc(alloc_size);
>>>>> +     if (addr < 0) {
>>>>> +             ERR("allocating addr failed");
>>>>> +             return NULL;
>>>>> +     }
>>>>>
>>>>> -/* Send a message
>>>>> - *
>>>>> - * @fd: file descriptor to send to
>>>>> - * @msg: a null-terminated string containing the message to send
>>>>> - *
>>>>> - * Return value:
>>>>> - * -1: error
>>>>> - * 0: connection closed
>>>>> - * 1: success
>>>>> - */
>>>>> +     addr->sun_family = AF_UNIX;
>>>>> +     strcpy(addr->sun_path, name);
>>>>> +
>>>>> +     *sock_addr_size = alloc_size;
>>>>> +
>>>>> +     return addr;
>>>>> +}
>>>>>
>>>>> -static int send_message_fd(int fd, const char *msg)
>>>>> +struct ustcomm_sock * ustcomm_init_sock(int fd, int epoll_fd,
>>>>> +                                     struct list_head *list)
>>>>>  {
>>>>> -     int result;
>>>>> +     struct epoll_event ev;
>>>>> +     struct ustcomm_sock *sock;
>>>>>
>>>>> -     /* Send including the final \0 */
>>>>> -     result = patient_send(fd, msg, strlen(msg)+1, MSG_NOSIGNAL);
>>>>> -     if(result == -1) {
>>>>> -             if(errno != EPIPE)
>>>>> -                     PERROR("send");
>>>>> -             return -1;
>>>>> +     sock = malloc(sizeof(struct ustcomm_sock));
>>>>> +     if (!sock) {
>>>>> +             perror("malloc: couldn't allocate ustcomm_sock");
>>>>> +             return NULL;
>>>>>       }
>>>>> -     else if(result == 0) {
>>>>> -             return 0;
>>>>> +
>>>>> +     ev.events = EPOLLIN;
>>>>> +     ev.data.ptr = sock;
>>>>> +     sock->fd = fd;
>>>>> +
>>>>> +     if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sock->fd, &ev) == -1) {
>>>>> +             perror("epoll_ctl: failed to add socket\n");
>>>>> +             free(sock);
>>>>> +             return NULL;
>>>>>       }
>>>>>
>>>>> -     DBG("sent message \"%s\"", msg);
>>>>> -     return 1;
>>>>> +     sock->epoll_fd = epoll_fd;
>>>>> +     if (list) {
>>>>> +             list_add(&sock->list, list);
>>>>> +     } else {
>>>>> +             INIT_LIST_HEAD(&sock->list);
>>>>> +     }
>>>>> +
>>>>> +     return sock;
>>>>>  }
>>>>>
>>>>> -/* Called by an app to ask the consumer daemon to connect to it. */
>>>>> +void ustcomm_del_sock(struct ustcomm_sock *sock, int keep_in_epoll)
>>>>> +{
>>>>> +     list_del(&sock->list);
>>>>> +     if (!keep_in_epoll) {
>>>>> +             if (epoll_ctl(sock->epoll_fd, EPOLL_CTL_DEL, sock->fd,
>>>>> NULL) == -1) {
>>>>> +                     PERROR("epoll_ctl: failed to delete socket");
>>>>> +             }
>>>>> +     }
>>>>> +     close(sock->fd);
>>>>> +     free(sock);
>>>>> +}
>>>>>
>>>>> -int ustcomm_request_consumer(pid_t pid, const char *channel)
>>>>> +struct ustcomm_sock * ustcomm_init_named_socket(const char *name,
>>>>> +                                             int epoll_fd)
>>>>>  {
>>>>> -     char path[UNIX_PATH_MAX];
>>>>>       int result;
>>>>> -     char *msg=NULL;
>>>>> -     int retval = 0;
>>>>> -     struct ustcomm_connection conn;
>>>>> -     char *explicit_daemon_socket_path;
>>>>> +     int fd;
>>>>> +     size_t sock_addr_size;
>>>>> +     struct sockaddr_un * addr;
>>>>> +     struct ustcomm_sock *sock;
>>>>>
>>>>> -     explicit_daemon_socket_path = getenv("UST_DAEMON_SOCKET");
>>>>> -     if(explicit_daemon_socket_path) {
>>>>> -             /* user specified explicitly a socket path */
>>>>> -             result = snprintf(path, UNIX_PATH_MAX, "%s",
>>>>> explicit_daemon_socket_path);
>>>>> -     }
>>>>> -     else {
>>>>> -             /* just use the default path */
>>>>> -             result = snprintf(path, UNIX_PATH_MAX, "%s/ustd",
>>>>> SOCK_DIR);
>>>>> +     fd = socket(PF_UNIX, SOCK_STREAM, 0);
>>>>> +     if(fd == -1) {
>>>>> +             PERROR("socket");
>>>>> +             return NULL;
>>>>>       }
>>>>>
>>>>> -     if(result >= UNIX_PATH_MAX) {
>>>>> -             ERR("string overflow allocating socket name");
>>>>> -             return -1;
>>>>> +     addr = create_sock_addr(name, &sock_addr_size);
>>>>> +     if (addr == NULL) {
>>>>> +             ERR("allocating addr, UST thread bailing");
>>>>> +             goto close_sock;
>>>>>       }
>>>>>
>>>>> -     if (asprintf(&msg, "collect %d %s", pid, channel) < 0) {
>>>>> -             ERR("ustcomm_request_consumer : asprintf failed
>>>>> (collect %d/%s)",
>>>>> -                 pid, channel);
>>>>> -             return -1;
>>>>> +     result = access(name, F_OK);
>>>>> +     if(result == 0) {
>>>>> +             /* file exists */
>>>>> +             result = unlink(name);
>>>>> +             if(result == -1) {
>>>>> +                     PERROR("unlink of socket file");
>>>>> +                     goto free_addr;
>>>>> +             }
>>>>> +             DBG("socket already exists; overwriting");
>>>>>       }
>>>>>
>>>>> -     /* don't signal it because it's the daemon */
>>>>> -     result = ustcomm_connect_path(path, &conn, -1);
>>>>> +     result = bind(fd, (struct sockaddr *)addr, sock_addr_size);
>>>>>       if(result == -1) {
>>>>> -             WARN("ustcomm_connect_path failed");
>>>>> -             retval = -1;
>>>>> -             goto del_string;
>>>>> +             PERROR("bind");
>>>>> +             goto free_addr;
>>>>>       }
>>>>>
>>>>> -     result = ustcomm_send_request(&conn, msg, NULL);
>>>>> +     result = listen(fd, 1);
>>>>>       if(result == -1) {
>>>>> -             WARN("ustcomm_send_request failed");
>>>>> -             retval = -1;
>>>>> -             goto disconnect;
>>>>> +             PERROR("listen");
>>>>> +             goto free_addr;
>>>>>       }
>>>>>
>>>>> -     disconnect:
>>>>> -     ustcomm_disconnect(&conn);
>>>>> -     del_string:
>>>>> -     free(msg);
>>>>> +     sock = ustcomm_init_sock(fd, epoll_fd,
>>>>> +                              NULL);
>>>>> +     if (!sock) {
>>>>> +             ERR("failed to create ustcomm_sock");
>>>>> +             goto free_addr;
>>>>> +     }
>>>>>
>>>>> -     return retval;
>>>>> -}
>>>>> +     free(addr);
>>>>>
>>>>> -/* returns 1 to indicate a message was received
>>>>> - * returns 0 to indicate no message was received (end of stream)
>>>>> - * returns -1 to indicate an error
>>>>> - */
>>>>> +     return sock;
>>>>>
>>>>> -#define RECV_INCREMENT 1000
>>>>> -#define RECV_INITIAL_BUF_SIZE 10
>>>>> +free_addr:
>>>>> +     free(addr);
>>>>> +close_sock:
>>>>> +     close(fd);
>>>>>
>>>>> -static int recv_message_fd(int fd, char **recv_buf, int
>>>>> *recv_buf_size, int *recv_buf_alloc, char **msg)
>>>>> +     return NULL;
>>>>> +}
>>>>> +
>>>>> +void ustcomm_del_named_sock(struct ustcomm_sock *sock,
>>>>> +                         int keep_socket_file)
>>>>>  {
>>>>> -     int result;
>>>>> +     int result, fd;
>>>>> +     struct stat st;
>>>>> +     struct sockaddr dummy;
>>>>> +     struct sockaddr_un *sockaddr = NULL;
>>>>> +     int alloc_size;
>>>>>
>>>>> -     /* 1. Check if there is a message in the buf */
>>>>> -     /* 2. If not, do:
>>>>> -           2.1 receive chunk and put it in buffer
>>>>> -        2.2 process full message if there is one
>>>>> -        -- while no message arrived
>>>>> -     */
>>>>> +     fd = sock->fd;
>>>>>
>>>>> -     for(;;) {
>>>>> -             int i;
>>>>> -             int nulfound = 0;
>>>>> +     if(!keep_socket_file) {
>>>>>
>>>>> -             /* Search for full message in buffer */
>>>>> -             for(i=0; i<*recv_buf_size; i++) {
>>>>> -                     if((*recv_buf)[i] == '\0') {
>>>>> -                             nulfound = 1;
>>>>> -                             break;
>>>>> -                     }
>>>>> +             /* Get the socket name */
>>>>> +             alloc_size = sizeof(dummy);
>>>>> +             if (getsockname(fd, &dummy, (socklen_t *)&alloc_size) <
>>>>> 0) {
>>>>> +                     PERROR("getsockname failed");
>>>>> +                     return;
>>>>>               }
>>>>>
>>>>> -             /* Process found message */
>>>>> -             if(nulfound == 1) {
>>>>> -                     char *newbuf;
>>>>> -
>>>>> -                     if(i == 0) {
>>>>> -                             /* problem */
>>>>> -                             WARN("received empty message");
>>>>> -                     }
>>>>> -                     *msg = strndup(*recv_buf, i);
>>>>> -
>>>>> -                     /* Remove processed message from buffer */
>>>>> -                     newbuf = (char *) malloc(*recv_buf_size - (i+1));
>>>>> -                     memcpy(newbuf, *recv_buf + (i+1),
>>>>> *recv_buf_size - (i+1));
>>>>> -                     free(*recv_buf);
>>>>> -                     *recv_buf = newbuf;
>>>>> -                     *recv_buf_size -= (i+1);
>>>>> -                     *recv_buf_alloc -= (i+1);
>>>>> -
>>>>> -                     return 1;
>>>>> +             sockaddr = malloc(alloc_size);
>>>>> +             if (!sockaddr) {
>>>>> +                     ERR("failed to allocate sockaddr");
>>>>> +                     return;
>>>>>               }
>>>>>
>>>>> -             /* Receive a chunk from the fd */
>>>>> -             if(*recv_buf_alloc - *recv_buf_size < RECV_INCREMENT) {
>>>>> -                     *recv_buf_alloc += RECV_INCREMENT -
>>>>> (*recv_buf_alloc - *recv_buf_size);
>>>>> -                     *recv_buf = (char *) realloc(*recv_buf,
>>>>> *recv_buf_alloc);
>>>>> +             if (getsockname(fd, sockaddr, (socklen_t *)&alloc_size)
>>>>> < 0) {
>>>>> +                     PERROR("getsockname failed");
>>>>> +                     goto free_sockaddr;
>>>>>               }
>>>>>
>>>>> -             result = recv(fd, *recv_buf+*recv_buf_size,
>>>>> RECV_INCREMENT, 0);
>>>>> +             /* Destroy socket */
>>>>> +             result = stat(sockaddr->sun_path, &st);
>>>>>               if(result == -1) {
>>>>> -                     if(errno == ECONNRESET) {
>>>>> -                             *recv_buf_size = 0;
>>>>> -                             return 0;
>>>>> -                     }
>>>>> -                     else if(errno == EINTR) {
>>>>> -                             return -1;
>>>>> -                     }
>>>>> -                     else {
>>>>> -                             PERROR("recv");
>>>>> -                             return -1;
>>>>> -                     }
>>>>> +                     PERROR("stat (%s)", sockaddr->sun_path);
>>>>> +                     goto free_sockaddr;
>>>>>               }
>>>>> -             if(result == 0) {
>>>>> -                     return 0;
>>>>> +
>>>>> +             /* Paranoid check before deleting. */
>>>>> +             result = S_ISSOCK(st.st_mode);
>>>>> +             if(!result) {
>>>>> +                     ERR("The socket we are about to delete is not a
>>>>> socket.");
>>>>> +                     goto free_sockaddr;
>>>>>               }
>>>>> -             *recv_buf_size += result;
>>>>>
>>>>> -             /* Go back to the beginning to check if there is a full
>>>>> message in the buffer */
>>>>> +             result = unlink(sockaddr->sun_path);
>>>>> +             if(result == -1) {
>>>>> +                     PERROR("unlink");
>>>>> +             }
>>>>>       }
>>>>>
>>>>> -     DBG("received message \"%s\"", *recv_buf);
>>>>> +     ustcomm_del_sock(sock, keep_socket_file);
>>>>>
>>>>> -     return 1;
>>>>> +free_sockaddr:
>>>>> +     free(sockaddr);
>>>>>
>>>>>  }
>>>>>
>>>>> -static int recv_message_conn(struct ustcomm_connection *conn, char
>>>>> **msg)
>>>>> -{
>>>>> -     return recv_message_fd(conn->fd, &conn->recv_buf,
>>>>> &conn->recv_buf_size, &conn->recv_buf_alloc, msg);
>>>>> -}
>>>>>
>>>>> -int ustcomm_send_reply(struct ustcomm_server *server, char *msg,
>>>>> struct ustcomm_source *src)
>>>>> +/* Called by an app to ask the consumer daemon to connect to it. */
>>>>> +
>>>>> +int ustcomm_request_consumer(pid_t pid, const char *channel)
>>>>>  {
>>>>> -     int result;
>>>>> +     int result, daemon_fd;
>>>>> +     int retval = 0;
>>>>> +     char *msg=NULL;
>>>>> +     char *explicit_daemon_socket_path, *daemon_path;
>>>>>
>>>>> -     result = send_message_fd(src->fd, msg);
>>>>> -     if(result < 0) {
>>>>> -             ERR("error in send_message_fd");
>>>>> +     explicit_daemon_socket_path = getenv("UST_DAEMON_SOCKET");
>>>>> +     if (explicit_daemon_socket_path) {
>>>>> +             /* user specified explicitly a socket path */
>>>>> +             result = asprintf(&daemon_path, "%s",
>>>>> explicit_daemon_socket_path);
>>>>> +     } else {
>>>>> +             /* just use the default path */
>>>>> +             result = asprintf(&daemon_path, "%s/ustd", SOCK_DIR);
>>>>> +     }
>>>>> +     if (result == -1) {
>>>>> +             ERR("string overflow allocating socket name");
>>>>>               return -1;
>>>>>       }
>>>>>
>>>>> -     return 0;
>>>>> -}
>>>>> -
>>>>> -/* Called after a fork. */
>>>>> +     if (asprintf(&msg, "collect %d %s", pid, channel) < 0) {
>>>>> +             ERR("ustcomm_request_consumer : asprintf failed
>>>>> (collect %d/%s)",
>>>>> +                 pid, channel);
>>>>> +             retval = -1;
>>>>> +             goto free_daemon_path;
>>>>> +     }
>>>>>
>>>>> -int ustcomm_close_all_connections(struct ustcomm_server *server)
>>>>> -{
>>>>> -     struct ustcomm_connection *conn;
>>>>> -     struct ustcomm_connection *deletable_conn = NULL;
>>>>> +     result = ustcomm_connect_path(daemon_path, &daemon_fd);
>>>>> +     if (result == -1) {
>>>>> +             WARN("ustcomm_connect_path failed, daemon_path: %s",
>>>>> +                  daemon_path);
>>>>> +             retval = -1;
>>>>> +             goto del_string;
>>>>> +     }
>>>>>
>>>>> -     list_for_each_entry(conn, &server->connections, list) {
>>>>> -             free(deletable_conn);
>>>>> -             deletable_conn = conn;
>>>>> -             ustcomm_close_app(conn);
>>>>> -             list_del(&conn->list);
>>>>> +     result = ustcomm_send_request(daemon_fd, msg, NULL);
>>>>> +     if (result == -1) {
>>>>> +             WARN("ustcomm_send_request failed, daemon path: %s",
>>>>> +                  daemon_path);
>>>>> +             retval = -1;
>>>>>       }
>>>>>
>>>>> -     return 0;
>>>>> +     close(daemon_fd);
>>>>> +del_string:
>>>>> +     free(msg);
>>>>> +free_daemon_path:
>>>>> +     free(daemon_path);
>>>>> +
>>>>> +     return retval;
>>>>>  }
>>>>>
>>>>> -/* @timeout: max blocking time in milliseconds, -1 means infinity
>>>>> - *
>>>>> - * returns 1 to indicate a message was received
>>>>> - * returns 0 to indicate no message was received
>>>>> +/* returns 1 to indicate a message was received
>>>>> + * returns 0 to indicate no message was received (end of stream)
>>>>>   * returns -1 to indicate an error
>>>>>   */
>>>>> -
>>>>> -int ustcomm_recv_message(struct ustcomm_server *server, char **msg,
>>>>> struct ustcomm_source *src, int timeout)
>>>>> +int ustcomm_recv_fd(int sock,
>>>>> +                 struct ustcomm_header *header,
>>>>> +                 char **data, int *fd)
>>>>>  {
>>>>> -     struct pollfd *fds;
>>>>> -     struct ustcomm_connection **conn_table;
>>>>> -     struct ustcomm_connection *conn;
>>>>>       int result;
>>>>>       int retval;
>>>>> -
>>>>> -     for(;;) {
>>>>> -             int idx = 0;
>>>>> -             int n_fds = 1;
>>>>> -
>>>>> -             list_for_each_entry(conn, &server->connections, list) {
>>>>> -                     n_fds++;
>>>>> -             }
>>>>> -
>>>>> -             fds = (struct pollfd *) zmalloc(n_fds * sizeof(struct
>>>>> pollfd));
>>>>> -             if(fds == NULL) {
>>>>> -                     ERR("zmalloc returned NULL");
>>>>> +     struct ustcomm_header peek_header;
>>>>> +     struct iovec iov[2];
>>>>> +     struct msghdr msg;
>>>>> +     struct cmsghdr *cmsg;
>>>>> +     char buf[CMSG_SPACE(sizeof(int))];
>>>>> +
>>>>> +     result = recv(sock, &peek_header, sizeof(peek_header),
>>>>> +                   MSG_PEEK | MSG_WAITALL);
>>>>> +     if (result <= 0) {
>>>>> +             if(errno == ECONNRESET) {
>>>>> +                     return 0;
>>>>> +             } else if (errno == EINTR) {
>>>>> +                     return -1;
>>>>> +             } else if (result < 0) {
>>>>> +                     PERROR("recv");
>>>>>                       return -1;
>>>>>               }
>>>>> +             return 0;
>>>>> +     }
>>>>>
>>>>> -             conn_table = (struct ustcomm_connection **)
>>>>> zmalloc(n_fds * sizeof(struct ustcomm_connection *));
>>>>> -             if(conn_table == NULL) {
>>>>> -                     ERR("zmalloc returned NULL");
>>>>> -                     retval = -1;
>>>>> -                     goto free_fds_return;
>>>>> -             }
>>>>> +     memset(&msg, 0, sizeof(msg));
>>>>>
>>>>> -             /* special idx 0 is for listening socket */
>>>>> -             fds[idx].fd = server->listen_fd;
>>>>> -             fds[idx].events = POLLIN;
>>>>> -             idx++;
>>>>> +     iov[0].iov_base = (char *)header;
>>>>> +     iov[0].iov_len = sizeof(struct ustcomm_header);
>>>>>
>>>>> -             list_for_each_entry(conn, &server->connections, list) {
>>>>> -                     fds[idx].fd = conn->fd;
>>>>> -                     fds[idx].events = POLLIN;
>>>>> -                     conn_table[idx] = conn;
>>>>> -                     idx++;
>>>>> -             }
>>>>> +     msg.msg_iov = iov;
>>>>> +     msg.msg_iovlen = 1;
>>>>>
>>>>> -             result = poll(fds, n_fds, timeout);
>>>>> -             if(result == -1 && errno == EINTR) {
>>>>> -                     /* That's ok. ustd receives signals to notify
>>>>> it must shutdown. */
>>>>> -                     retval = -1;
>>>>> -                     goto free_conn_table_return;
>>>>> -             }
>>>>> -             else if(result == -1) {
>>>>> -                     PERROR("poll");
>>>>> -                     retval = -1;
>>>>> -                     goto free_conn_table_return;
>>>>> +     if (peek_header.size) {
>>>>> +             if (peek_header.size < 0 || peek_header.size > 100) {
>>>>> +                     WARN("big peek header! %d", peek_header.size);
>>>>>               }
>>>>> -             else if(result == 0) {
>>>>> -                     retval = 0;
>>>>> -                     goto free_conn_table_return;
>>>>> +             *data = malloc(peek_header.size);
>>>>> +             if (!*data) {
>>>>> +                     ERR("failed to allocate space for message");
>>>>>               }
>>>>>
>>>>> -             if(fds[0].revents) {
>>>>> -                     struct ustcomm_connection *newconn;
>>>>> -                     int newfd;
>>>>> +             iov[1].iov_base = (char *)*data;
>>>>> +             iov[1].iov_len = peek_header.size;
>>>>>
>>>>> -                     result = newfd = accept(server->listen_fd,
>>>>> NULL, NULL);
>>>>> -                     if(result == -1) {
>>>>> -                             PERROR("accept");
>>>>> -                             retval = -1;
>>>>> -                             goto free_conn_table_return;
>>>>> -                     }
>>>>> +             msg.msg_iovlen++;
>>>>> +     }
>>>>>
>>>>> -                     newconn = (struct ustcomm_connection *)
>>>>> zmalloc(sizeof(struct ustcomm_connection));
>>>>> -                     if(newconn == NULL) {
>>>>> -                             ERR("zmalloc returned NULL");
>>>>> -                             return -1;
>>>>> -                     }
>>>>> +     if (fd && peek_header.fd_included) {
>>>>> +             msg.msg_control = buf;
>>>>> +             msg.msg_controllen = sizeof(buf);
>>>>> +     }
>>>>>
>>>>> -                     ustcomm_init_connection(newconn);
>>>>> -                     newconn->fd = newfd;
>>>>> +     result = recvmsg(sock, &msg,
>>>>> +                      MSG_WAITALL);
>>>>>
>>>>> -                     list_add(&newconn->list, &server->connections);
>>>>> +     if (result <= 0) {
>>>>> +             if(errno == ECONNRESET) {
>>>>> +                     retval = 0;
>>>>> +             } else if (errno == EINTR) {
>>>>> +                     retval = -1;
>>>>> +             } else if (result < 0) {
>>>>> +                     PERROR("recv");
>>>>> +                     retval = -1;
>>>>> +             } else {
>>>>> +                     retval = 0;
>>>>>               }
>>>>> -
>>>>> -             for(idx=1; idx<n_fds; idx++) {
>>>>> -                     if(fds[idx].revents) {
>>>>> -                             retval =
>>>>> recv_message_conn(conn_table[idx], msg);
>>>>> -                             if(src)
>>>>> -                                     src->fd = fds[idx].fd;
>>>>> -
>>>>> -                             if(retval == 0) {
>>>>> -                                     /* connection finished */
>>>>> -                                     list_for_each_entry(conn,
>>>>> &server->connections, list) {
>>>>> -                                             if(conn->fd ==
>>>>> fds[idx].fd) {
>>>>> -
>>>>> ustcomm_close_app(conn);
>>>>> -
>>>>> list_del(&conn->list);
>>>>> -                                                     free(conn);
>>>>> -                                                     break;
>>>>> -                                             }
>>>>> -                                     }
>>>>> -                             }
>>>>> -                             else {
>>>>> -                                     goto free_conn_table_return;
>>>>> -                             }
>>>>> +             free(*data);
>>>>> +             return retval;
>>>>> +     }
>>>>> +
>>>>> +     if (fd && peek_header.fd_included) {
>>>>> +             cmsg = CMSG_FIRSTHDR(&msg);
>>>>> +             result = 0;
>>>>> +             while (cmsg != NULL) {
>>>>> +                     if (cmsg->cmsg_level == SOL_SOCKET
>>>>> +                         && cmsg->cmsg_type  == SCM_RIGHTS) {
>>>>> +                             *fd = *(int *) CMSG_DATA(cmsg);
>>>>> +                             result = 1;
>>>>> +                             break;
>>>>>                       }
>>>>> +                     cmsg = CMSG_NXTHDR(&msg, cmsg);
>>>>> +             }
>>>>> +             if (!result) {
>>>>> +                     ERR("Failed to receive file descriptor\n");
>>>>>               }
>>>>> -
>>>>> -             free(fds);
>>>>> -             free(conn_table);
>>>>>       }
>>>>>
>>>>> -free_conn_table_return:
>>>>> -     free(conn_table);
>>>>> -free_fds_return:
>>>>> -     free(fds);
>>>>> -     return retval;
>>>>> +     return 1;
>>>>>  }
>>>>>
>>>>> -int ustcomm_ustd_recv_message(struct ustcomm_ustd *ustd, char **msg,
>>>>> struct ustcomm_source *src, int timeout)
>>>>> +int ustcomm_recv(int sock,
>>>>> +              struct ustcomm_header *header,
>>>>> +              char **data)
>>>>>  {
>>>>> -     return ustcomm_recv_message(&ustd->server, msg, src, timeout);
>>>>> +     return ustcomm_recv_fd(sock, header, data, NULL);
>>>>>  }
>>>>>
>>>>> -int ustcomm_app_recv_message(struct ustcomm_app *app, char **msg,
>>>>> struct ustcomm_source *src, int timeout)
>>>>> +
>>>>> +int recv_message_conn(int sock, char **msg)
>>>>>  {
>>>>> -     return ustcomm_recv_message(&app->server, msg, src, timeout);
>>>>> -}
>>>>> +     struct ustcomm_header header;
>>>>>
>>>>> -/* This removes src from the list of active connections of app.
>>>>> - */
>>>>> +     return ustcomm_recv(sock, &header, msg);
>>>>> +}
>>>>>
>>>>> -int ustcomm_app_detach_client(struct ustcomm_app *app, struct
>>>>> ustcomm_source *src)
>>>>> +int ustcomm_send_fd(int sock,
>>>>> +                 const struct ustcomm_header *header,
>>>>> +                 const char *data,
>>>>> +                 int *fd)
>>>>>  {
>>>>> -     struct ustcomm_server *server = (struct ustcomm_server *)app;
>>>>> -     struct ustcomm_connection *conn;
>>>>> +     struct iovec iov[2];
>>>>> +     struct msghdr msg;
>>>>> +     int result;
>>>>> +     struct cmsghdr *cmsg;
>>>>> +     char buf[CMSG_SPACE(sizeof(int))];
>>>>> +
>>>>> +     memset(&msg, 0, sizeof(msg));
>>>>> +
>>>>> +     iov[0].iov_base = (char *)header;
>>>>> +     iov[0].iov_len = sizeof(struct ustcomm_header);
>>>>> +
>>>>> +     msg.msg_iov = iov;
>>>>> +     msg.msg_iovlen = 1;
>>>>> +
>>>>> +     if (header->size) {
>>>>> +             iov[1].iov_base = (char *)data;
>>>>> +             iov[1].iov_len = header->size;
>>>>> +
>>>>> +             msg.msg_iovlen++;
>>>>>
>>>>> -     list_for_each_entry(conn, &server->connections, list) {
>>>>> -             if(conn->fd == src->fd) {
>>>>> -                     list_del(&conn->list);
>>>>> -                     goto found;
>>>>> -             }
>>>>>       }
>>>>>
>>>>> -     return -1;
>>>>> -found:
>>>>> -     return src->fd;
>>>>> +     if (fd && header->fd_included) {
>>>>> +             msg.msg_control = buf;
>>>>> +             msg.msg_controllen = sizeof(buf);
>>>>> +             cmsg = CMSG_FIRSTHDR(&msg);
>>>>> +             cmsg->cmsg_level = SOL_SOCKET;
>>>>> +             cmsg->cmsg_type = SCM_RIGHTS;
>>>>> +             cmsg->cmsg_len = CMSG_LEN(sizeof(int));
>>>>> +             *(int *) CMSG_DATA(cmsg) = *fd;
>>>>> +             msg.msg_controllen = cmsg->cmsg_len;
>>>>> +     }
>>>>> +
>>>>> +     result = sendmsg(sock, &msg, MSG_NOSIGNAL);
>>>>> +     if (result < 0 && errno != EPIPE) {
>>>>> +             PERROR("sendmsg failed");
>>>>> +     }
>>>>> +     return result;
>>>>>  }
>>>>>
>>>>> -static int init_named_socket(const char *name, char **path_out)
>>>>> +int ustcomm_send(int sock,
>>>>> +              const struct ustcomm_header *header,
>>>>> +              const char *data)
>>>>>  {
>>>>> -     int result;
>>>>> -     int fd;
>>>>> +     return ustcomm_send_fd(sock, header, data, NULL);
>>>>> +}
>>>>>
>>>>> -     struct sockaddr_un addr;
>>>>> -
>>>>> -     result = fd = socket(PF_UNIX, SOCK_STREAM, 0);
>>>>> -     if(result == -1) {
>>>>> -             PERROR("socket");
>>>>> -             return -1;
>>>>> -     }
>>>>> +int ustcomm_send_reply(char *msg, int sock)
>>>>> +{
>>>>> +     int result;
>>>>> +     struct ustcomm_header header;
>>>>>
>>>>> -     addr.sun_family = AF_UNIX;
>>>>> +     memset(&header, 0, sizeof(header));
>>>>>
>>>>> -     strncpy(addr.sun_path, name, UNIX_PATH_MAX);
>>>>> -     addr.sun_path[UNIX_PATH_MAX-1] = '\0';
>>>>> +     header.size = strlen(msg) + 1;
>>>>>
>>>>> -     result = access(name, F_OK);
>>>>> -     if(result == 0) {
>>>>> -             /* file exists */
>>>>> -             result = unlink(name);
>>>>> -             if(result == -1) {
>>>>> -                     PERROR("unlink of socket file");
>>>>> -                     goto close_sock;
>>>>> -             }
>>>>> -             DBG("socket already exists; overwriting");
>>>>> +     result = ustcomm_send(sock, &header, msg);
>>>>> +     if(result < 0) {
>>>>> +             ERR("error in ustcomm_send");
>>>>> +             return result;
>>>>>       }
>>>>>
>>>>> -     result = bind(fd, (struct sockaddr *)&addr, sizeof(addr));
>>>>> -     if(result == -1) {
>>>>> -             PERROR("bind");
>>>>> -             goto close_sock;
>>>>> -     }
>>>>> +     return 0;
>>>>> +}
>>>>>
>>>>> -     result = listen(fd, 1);
>>>>> -     if(result == -1) {
>>>>> -             PERROR("listen");
>>>>> -             goto close_sock;
>>>>> -     }
>>>>> +int ustcomm_send_req(int sock,
>>>>> +                  const struct ustcomm_header *req_header,
>>>>> +                  const char *data,
>>>>> +                  char **response)
>>>>> +{
>>>>> +     int result;
>>>>> +     struct ustcomm_header res_header;
>>>>>
>>>>> -     if(path_out) {
>>>>> -             *path_out = strdup(addr.sun_path);
>>>>> +     result = ustcomm_send(sock, req_header, data);
>>>>> +     if ( result <= 0) {
>>>>> +             return result;
>>>>>       }
>>>>>
>>>>> -     return fd;
>>>>> +     if (!response) {
>>>>> +             return 1;
>>>>> +     }
>>>>>
>>>>> -     close_sock:
>>>>> -     close(fd);
>>>>> +     return ustcomm_recv(sock,
>>>>> +                         &res_header,
>>>>> +                         response);
>>>>>
>>>>> -     return -1;
>>>>>  }
>>>>>
>>>>>  /*
>>>>> @@ -527,27 +545,17 @@ static int init_named_socket(const char *name,
>>>>> char **path_out)
>>>>>   * ECONNRESET, which is normal when the application dies.
>>>>>   */
>>>>>
>>>>> -int ustcomm_send_request(struct ustcomm_connection *conn, const char
>>>>> *req, char **reply)
>>>>> +int ustcomm_send_request(int sock, const char *req, char **reply)
>>>>>  {
>>>>> -     int result;
>>>>> +     struct ustcomm_header req_header;
>>>>>
>>>>> -     /* Send including the final \0 */
>>>>> -     result = send_message_fd(conn->fd, req);
>>>>> -     if(result != 1)
>>>>> -             return result;
>>>>> +     req_header.size = strlen(req) + 1;
>>>>>
>>>>> -     if(!reply)
>>>>> -             return 1;
>>>>> +     return ustcomm_send_req(sock,
>>>>> +                             &req_header,
>>>>> +                             req,
>>>>> +                             reply);
>>>>>
>>>>> -     result = recv_message_conn(conn, reply);
>>>>> -     if(result == -1) {
>>>>> -             return -1;
>>>>> -     }
>>>>> -     else if(result == 0) {
>>>>> -             return 0;
>>>>> -     }
>>>>> -
>>>>> -     return 1;
>>>>>  }
>>>>>
>>>>>  /* Return value:
>>>>> @@ -555,52 +563,45 @@ int ustcomm_send_request(struct
>>>>> ustcomm_connection *conn, const char *req, char
>>>>>   * -1: error
>>>>>   */
>>>>>
>>>>> -int ustcomm_connect_path(const char *path, struct ustcomm_connection
>>>>> *conn, pid_t signalpid)
>>>>> +int ustcomm_connect_path(const char *name, int *connection_fd)
>>>>>  {
>>>>> -     int fd;
>>>>> -     int result;
>>>>> -     struct sockaddr_un addr;
>>>>> +     int result, fd;
>>>>> +     size_t sock_addr_size;
>>>>> +     struct sockaddr_un *addr;
>>>>>
>>>>> -     ustcomm_init_connection(conn);
>>>>> -
>>>>> -     result = fd = socket(PF_UNIX, SOCK_STREAM, 0);
>>>>> -     if(result == -1) {
>>>>> +     fd = socket(PF_UNIX, SOCK_STREAM, 0);
>>>>> +     if(fd == -1) {
>>>>>               PERROR("socket");
>>>>>               return -1;
>>>>>       }
>>>>>
>>>>> -     addr.sun_family = AF_UNIX;
>>>>> -
>>>>> -     result = snprintf(addr.sun_path, UNIX_PATH_MAX, "%s", path);
>>>>> -     if(result >= UNIX_PATH_MAX) {
>>>>> -             ERR("string overflow allocating socket name");
>>>>> -             return -1;
>>>>> -     }
>>>>> -
>>>>> -     if(signalpid >= 0) {
>>>>> -             result = signal_process(signalpid);
>>>>> -             if(result == -1) {
>>>>> -                     ERR("could not signal process");
>>>>> -                     return -1;
>>>>> -             }
>>>>> +     addr = create_sock_addr(name, &sock_addr_size);
>>>>> +     if (addr == NULL) {
>>>>> +             ERR("allocating addr failed");
>>>>> +             goto close_sock;
>>>>>       }
>>>>>
>>>>> -     result = connect(fd, (struct sockaddr *)&addr, sizeof(addr));
>>>>> +     result = connect(fd, (struct sockaddr *)addr, sock_addr_size);
>>>>>       if(result == -1) {
>>>>> -             PERROR("connect (path=%s)", path);
>>>>> -             return -1;
>>>>> +             PERROR("connect (path=%s)", name);
>>>>> +             goto free_sock_addr;
>>>>>       }
>>>>>
>>>>> -     conn->fd = fd;
>>>>> +     *connection_fd = fd;
>>>>> +
>>>>> +     free(addr);
>>>>>
>>>>>       return 0;
>>>>> -}
>>>>>
>>>>> -int ustcomm_disconnect(struct ustcomm_connection *conn)
>>>>> -{
>>>>> -     return close(conn->fd);
>>>>> +free_sock_addr:
>>>>> +     free(addr);
>>>>> +close_sock:
>>>>> +     close(fd);
>>>>> +
>>>>> +     return -1;
>>>>>  }
>>>>>
>>>>> +
>>>>>  /* Open a connection to a traceable app.
>>>>>   *
>>>>>   * Return value:
>>>>> @@ -608,35 +609,30 @@ int ustcomm_disconnect(struct
>>>>> ustcomm_connection *conn)
>>>>>   * -1: error
>>>>>   */
>>>>>
>>>>> -int ustcomm_connect_app(pid_t pid, struct ustcomm_connection *conn)
>>>>> +int ustcomm_connect_app(pid_t pid, int *app_fd)
>>>>>  {
>>>>>       int result;
>>>>> -     char path[UNIX_PATH_MAX];
>>>>> -
>>>>> +     int retval = 0;
>>>>> +     char *name;
>>>>>
>>>>> -     result = snprintf(path, UNIX_PATH_MAX, "%s/%d", SOCK_DIR, pid);
>>>>> -     if(result >= UNIX_PATH_MAX) {
>>>>> -             ERR("string overflow allocating socket name");
>>>>> +     result = asprintf(&name, "%s/%d", SOCK_DIR, pid);
>>>>> +     if (result < 0) {
>>>>> +             ERR("failed to allocate socket name");
>>>>>               return -1;
>>>>>       }
>>>>>
>>>>> -     return ustcomm_connect_path(path, conn, pid);
>>>>> -}
>>>>> -
>>>>> -/* Close a connection to a traceable app. It frees the
>>>>> - * resources. It however does not free the
>>>>> - * ustcomm_connection itself.
>>>>> - */
>>>>> +     result = ustcomm_connect_path(name, app_fd);
>>>>> +     if (result < 0) {
>>>>> +             ERR("failed to connect to app");
>>>>> +             retval = -1;
>>>>> +     }
>>>>>
>>>>> -int ustcomm_close_app(struct ustcomm_connection *conn)
>>>>> -{
>>>>> -     close(conn->fd);
>>>>> -     free(conn->recv_buf);
>>>>> +     free(name);
>>>>>
>>>>> -     return 0;
>>>>> +     return retval;
>>>>>  }
>>>>>
>>>>> -static int ensure_dir_exists(const char *dir)
>>>>> +int ensure_dir_exists(const char *dir)
>>>>>  {
>>>>>       struct stat st;
>>>>>       int result;
>>>>> @@ -663,139 +659,10 @@ static int ensure_dir_exists(const char *dir)
>>>>>       return 0;
>>>>>  }
>>>>>
>>>>> -/* Called by an application to initialize its server so daemons can
>>>>> - * connect to it.
>>>>> - */
>>>>> -
>>>>> -int ustcomm_init_app(pid_t pid, struct ustcomm_app *handle)
>>>>> -{
>>>>> -     int result;
>>>>> -     char *name;
>>>>> -
>>>>> -     result = asprintf(&name, "%s/%d", SOCK_DIR, (int)pid);
>>>>> -     if(result >= UNIX_PATH_MAX) {
>>>>> -             ERR("string overflow allocating socket name");
>>>>> -             return -1;
>>>>> -     }
>>>>> -
>>>>> -     result = ensure_dir_exists(SOCK_DIR);
>>>>> -     if(result == -1) {
>>>>> -             ERR("Unable to create socket directory %s", SOCK_DIR);
>>>>> -             return -1;
>>>>> -     }
>>>>> -
>>>>> -     handle->server.listen_fd = init_named_socket(name,
>>>>> &(handle->server.socketpath));
>>>>> -     if(handle->server.listen_fd < 0) {
>>>>> -             ERR("Error initializing named socket (%s). Check that
>>>>> directory exists and that it is writable.", name);
>>>>> -             goto free_name;
>>>>> -     }
>>>>> -     free(name);
>>>>> -
>>>>> -     INIT_LIST_HEAD(&handle->server.connections);
>>>>> -
>>>>> -     return 0;
>>>>> -
>>>>> -free_name:
>>>>> -     free(name);
>>>>> -     return -1;
>>>>> -}
>>>>> -
>>>>>  /* Used by the daemon to initialize its server so applications
>>>>>   * can connect to it.
>>>>>   */
>>>>>
>>>>> -int ustcomm_init_ustd(struct ustcomm_ustd *handle, const char
>>>>> *sock_path)
>>>>> -{
>>>>> -     char *name;
>>>>> -     int retval = 0;
>>>>> -
>>>>> -     if(sock_path) {
>>>>> -             if (asprintf(&name, "%s", sock_path) < 0) {
>>>>> -                     ERR("ustcomm_init_ustd : asprintf failed
>>>>> (sock_path %s)",
>>>>> -                         sock_path);
>>>>> -                     return -1;
>>>>> -             }
>>>>> -     }
>>>>> -     else {
>>>>> -             int result;
>>>>> -
>>>>> -             /* Only check if socket dir exists if we are using the
>>>>> default directory */
>>>>> -             result = ensure_dir_exists(SOCK_DIR);
>>>>> -             if(result == -1) {
>>>>> -                     ERR("Unable to create socket directory %s",
>>>>> SOCK_DIR);
>>>>> -                     return -1;
>>>>> -             }
>>>>> -
>>>>> -             if (asprintf(&name, "%s/%s", SOCK_DIR, "ustd") < 0) {
>>>>> -                     ERR("ustcomm_init_ustd : asprintf failed
>>>>> (%s/ustd)",
>>>>> -                         SOCK_DIR);
>>>>> -                     return -1;
>>>>> -             }
>>>>> -     }
>>>>> -
>>>>> -     handle->server.listen_fd = init_named_socket(name,
>>>>> &handle->server.socketpath);
>>>>> -     if(handle->server.listen_fd < 0) {
>>>>> -             ERR("error initializing named socket at %s", name);
>>>>> -             retval = -1;
>>>>> -             goto free_name;
>>>>> -     }
>>>>> -
>>>>> -     INIT_LIST_HEAD(&handle->server.connections);
>>>>> -
>>>>> -free_name:
>>>>> -     free(name);
>>>>> -
>>>>> -     return retval;
>>>>> -}
>>>>> -
>>>>> -static void ustcomm_fini_server(struct ustcomm_server *server, int
>>>>> keep_socket_file)
>>>>> -{
>>>>> -     int result;
>>>>> -     struct stat st;
>>>>> -
>>>>> -     if(!keep_socket_file) {
>>>>> -             /* Destroy socket */
>>>>> -             result = stat(server->socketpath, &st);
>>>>> -             if(result == -1) {
>>>>> -                     PERROR("stat (%s)", server->socketpath);
>>>>> -                     return;
>>>>> -             }
>>>>> -
>>>>> -             /* Paranoid check before deleting. */
>>>>> -             result = S_ISSOCK(st.st_mode);
>>>>> -             if(!result) {
>>>>> -                     ERR("The socket we are about to delete is not a
>>>>> socket.");
>>>>> -                     return;
>>>>> -             }
>>>>> -
>>>>> -             result = unlink(server->socketpath);
>>>>> -             if(result == -1) {
>>>>> -                     PERROR("unlink");
>>>>> -             }
>>>>> -     }
>>>>> -
>>>>> -     free(server->socketpath);
>>>>> -
>>>>> -     result = close(server->listen_fd);
>>>>> -     if(result == -1) {
>>>>> -             PERROR("close");
>>>>> -             return;
>>>>> -     }
>>>>> -}
>>>>> -
>>>>> -/* Free a traceable application server */
>>>>> -
>>>>> -void ustcomm_fini_app(struct ustcomm_app *handle, int keep_socket_file)
>>>>> -{
>>>>> -     ustcomm_fini_server(&handle->server, keep_socket_file);
>>>>> -}
>>>>> -
>>>>> -/* Free a ustd server */
>>>>> -
>>>>> -void ustcomm_fini_ustd(struct ustcomm_ustd *handle)
>>>>> -{
>>>>> -     ustcomm_fini_server(&handle->server, 0);
>>>>> -}
>>>>>
>>>>>  static const char *find_tok(const char *str)
>>>>>  {
>>>>> @@ -884,89 +751,3 @@ char *nth_token(const char *str, int tok_no)
>>>>>
>>>>>       return retval;
>>>>>  }
>>>>> -
>>>>> -/* Callback from multipoll.
>>>>> - * Receive a new connection on the listening socket.
>>>>> - */
>>>>> -
>>>>> -static int process_mp_incoming_conn(void *priv, int fd, short events)
>>>>> -{
>>>>> -     struct ustcomm_connection *newconn;
>>>>> -     struct ustcomm_server *server = (struct ustcomm_server *) priv;
>>>>> -     int newfd;
>>>>> -     int result;
>>>>> -
>>>>> -     result = newfd = accept(server->listen_fd, NULL, NULL);
>>>>> -     if(result == -1) {
>>>>> -             PERROR("accept");
>>>>> -             return -1;
>>>>> -     }
>>>>> -
>>>>> -     newconn = (struct ustcomm_connection *) zmalloc(sizeof(struct
>>>>> ustcomm_connection));
>>>>> -     if(newconn == NULL) {
>>>>> -             ERR("zmalloc returned NULL");
>>>>> -             return -1;
>>>>> -     }
>>>>> -
>>>>> -     ustcomm_init_connection(newconn);
>>>>> -     newconn->fd = newfd;
>>>>> -
>>>>> -     list_add(&newconn->list, &server->connections);
>>>>> -
>>>>> -     return 0;
>>>>> -}
>>>>> -
>>>>> -/* Callback from multipoll.
>>>>> - * Receive a message on an existing connection.
>>>>> - */
>>>>> -
>>>>> -static int process_mp_conn_msg(void *priv, int fd, short revents)
>>>>> -{
>>>>> -     struct ustcomm_multipoll_conn_info *mpinfo = (struct
>>>>> ustcomm_multipoll_conn_info *) priv;
>>>>> -     int result;
>>>>> -     char *msg;
>>>>> -     struct ustcomm_source src;
>>>>> -
>>>>> -     if(revents) {
>>>>> -             src.fd = fd;
>>>>> -
>>>>> -             result = recv_message_conn(mpinfo->conn, &msg);
>>>>> -             if(result == -1) {
>>>>> -                     ERR("error in recv_message_conn");
>>>>> -             }
>>>>> -
>>>>> -             else if(result == 0) {
>>>>> -                     /* connection finished */
>>>>> -                     ustcomm_close_app(mpinfo->conn);
>>>>> -                     list_del(&mpinfo->conn->list);
>>>>> -                     free(mpinfo->conn);
>>>>> -             }
>>>>> -             else {
>>>>> -                     mpinfo->cb(msg, &src);
>>>>> -                     free(msg);
>>>>> -             }
>>>>> -     }
>>>>> -
>>>>> -     return 0;
>>>>> -}
>>>>> -
>>>>> -int free_ustcomm_client_poll(void *data)
>>>>> -{
>>>>> -     free(data);
>>>>> -     return 0;
>>>>> -}
>>>>> -
>>>>> -void ustcomm_mp_add_app_clients(struct mpentries *ent, struct
>>>>> ustcomm_app *app, int (*cb)(char *recvbuf, struct ustcomm_source *src))
>>>>> -{
>>>>> -     struct ustcomm_connection *conn;
>>>>> -
>>>>> -     /* add listener socket */
>>>>> -     multipoll_add(ent, app->server.listen_fd, POLLIN,
>>>>> process_mp_incoming_conn, &app->server, NULL);
>>>>> -
>>>>> -     list_for_each_entry(conn, &app->server.connections, list) {
>>>>> -             struct ustcomm_multipoll_conn_info *mpinfo = (struct
>>>>> ustcomm_multipoll_conn_info *) zmalloc(sizeof(struct
>>>>> ustcomm_multipoll_conn_info));
>>>>> -             mpinfo->conn = conn;
>>>>> -             mpinfo->cb = cb;
>>>>> -             multipoll_add(ent, conn->fd, POLLIN,
>>>>> process_mp_conn_msg, mpinfo, free_ustcomm_client_poll);
>>>>> -     }
>>>>> -}
>>>>> diff --git a/libustcomm/ustcomm.h b/libustcomm/ustcomm.h
>>>>> index f96ca16..f3c07b6 100644
>>>>> --- a/libustcomm/ustcomm.h
>>>>> +++ b/libustcomm/ustcomm.h
>>>>> @@ -23,73 +23,62 @@
>>>>>  #include <urcu/list.h>
>>>>>
>>>>>  #include <ust/kcompat/kcompat.h>
>>>>> -#include "multipoll.h"
>>>>>
>>>>>  #define SOCK_DIR "/tmp/ust-app-socks"
>>>>>  #define UST_SIGNAL SIGIO
>>>>>
>>>>> -struct ustcomm_connection {
>>>>> +struct ustcomm_sock {
>>>>>       struct list_head list;
>>>>>       int fd;
>>>>> -     /* Data that has not yet been consumed: */
>>>>> -     char *recv_buf;
>>>>> -     int recv_buf_size;
>>>>> -     int recv_buf_alloc;
>>>>> +     int epoll_fd;
>>>>>  };
>>>>>
>>>>> -/* ustcomm_server must be shallow-copyable */
>>>>> -struct ustcomm_server {
>>>>> -     /* the "server" socket for serving the external requests */
>>>>> -     int listen_fd;
>>>>> -     char *socketpath;
>>>>> -
>>>>> -     struct list_head connections;
>>>>> -};
>>>>> -
>>>>> -struct ustcomm_ustd {
>>>>> -     struct ustcomm_server server;
>>>>> +struct ustcomm_header {
>>>>> +     int type;
>>>>> +     long size;
>>>>> +     int command;
>>>>> +     int response;
>>>>> +     int fd_included;
>>>>>  };
>>>>>
>>>>> -struct ustcomm_app {
>>>>> -     struct ustcomm_server server;
>>>>> -};
>>>>>
>>>>> -/* ustcomm_source must be shallow-copyable */
>>>>> -struct ustcomm_source {
>>>>> -     int fd;
>>>>> -     void *priv;
>>>>> -};
>>>>> +//int send_message_pid(pid_t pid, const char *msg, char **reply);
>>>>>
>>>>> -struct ustcomm_multipoll_conn_info {
>>>>> -     struct ustcomm_connection *conn;
>>>>> -     int (*cb)(char *msg, struct ustcomm_source *src);
>>>>> -};
>>>>> +/* Ensure directory existence, usefull for unix sockets */
>>>>> +extern int ensure_dir_exists(const char *dir);
>>>>>
>>>>> -//int send_message_pid(pid_t pid, const char *msg, char **reply);
>>>>> -extern int ustcomm_request_consumer(pid_t pid, const char *channel);
>>>>> +/* Create and delete sockets */
>>>>> +extern struct ustcomm_sock * ustcomm_init_sock(int fd, int epoll_fd,
>>>>> +                                            struct list_head *list);
>>>>> +extern void ustcomm_del_sock(struct ustcomm_sock *sock, int
>>>>> keep_in_epoll);
>>>>>
>>>>> -extern int ustcomm_ustd_recv_message(struct ustcomm_ustd *ustd, char
>>>>> **msg, struct ustcomm_source *src, int timeout);
>>>>> -extern int ustcomm_app_recv_message(struct ustcomm_app *app, char
>>>>> **msg, struct ustcomm_source *src, int timeout);
>>>>> +/* Create and delete named sockets */
>>>>> +extern struct ustcomm_sock * ustcomm_init_named_socket(const char
>>>>> *name,
>>>>> +                                                    int epoll_fd);
>>>>> +extern void ustcomm_del_named_sock(struct ustcomm_sock *sock,
>>>>> +                                int keep_socket_file);
>>>>>
>>>>> -extern int ustcomm_init_app(pid_t pid, struct ustcomm_app *handle);
>>>>> -extern void ustcomm_fini_app(struct ustcomm_app *handle, int
>>>>> keep_socket_file);
>>>>> -extern void ustcomm_fini_ustd(struct ustcomm_ustd *handle);
>>>>> +/* Send and receive functions for file descriptors */
>>>>> +extern int ustcomm_send_fd(int sock, const struct ustcomm_header
>>>>> *header,
>>>>> +                        const char *data, int *fd);
>>>>> +extern int ustcomm_recv_fd(int sock, struct ustcomm_header *header,
>>>>> +                        char **data, int *fd);
>>>>>
>>>>> -extern int ustcomm_init_ustd(struct ustcomm_ustd *handle, const char
>>>>> *sock_path);
>>>>> +/* Normal send and receive functions */
>>>>> +extern int ustcomm_send(int sock, const struct ustcomm_header *header,
>>>>> +                     const char *data);
>>>>> +extern int ustcomm_recv(int sock, struct ustcomm_header *header,
>>>>> +                     char **data);
>>>>>
>>>>> -extern int ustcomm_connect_app(pid_t pid, struct ustcomm_connection
>>>>> *conn);
>>>>> -extern int ustcomm_close_app(struct ustcomm_connection *conn);
>>>>> -extern int ustcomm_connect_path(const char *path, struct
>>>>> ustcomm_connection *conn, pid_t signalpid);
>>>>> -extern int ustcomm_send_request(struct ustcomm_connection *conn,
>>>>> const char *req, char **reply);
>>>>> -extern int ustcomm_send_reply(struct ustcomm_server *server, char
>>>>> *msg, struct ustcomm_source *src);
>>>>> -extern int ustcomm_disconnect(struct ustcomm_connection *conn);
>>>>> -extern int ustcomm_close_all_connections(struct ustcomm_server
>>>>> *server);
>>>>> -extern void ustcomm_mp_add_app_clients(struct mpentries *ent, struct
>>>>> ustcomm_app *app, int (*cb)(char *recvbuf, struct ustcomm_source *src));
>>>>>
>>>>> +extern int ustcomm_request_consumer(pid_t pid, const char *channel);
>>>>> +extern int ustcomm_connect_app(pid_t pid, int *app_fd);
>>>>> +extern int ustcomm_connect_path(const char *path, int *connection_fd);
>>>>> +extern int ustcomm_send_request(int sock, const char *req, char
>>>>> **reply);
>>>>> +extern int ustcomm_send_reply(char *msg, int sock);
>>>>> +extern int recv_message_conn(int sock, char **msg);
>>>>>  extern int nth_token_is(const char *str, const char *token, int
>>>>> tok_no);
>>>>>
>>>>>  extern char *nth_token(const char *str, int tok_no);
>>>>>
>>>>> -extern int pid_is_online(pid_t);
>>>>> -
>>>>>  #endif /* USTCOMM_H */
>>>>> diff --git a/libustd/libustd.c b/libustd/libustd.c
>>>>> index 999e4da..cb5b123 100644
>>>>> --- a/libustd/libustd.c
>>>>> +++ b/libustd/libustd.c
>>>>> @@ -18,7 +18,10 @@
>>>>>
>>>>>  #define _GNU_SOURCE
>>>>>
>>>>> +#include <sys/epoll.h>
>>>>>  #include <sys/shm.h>
>>>>> +#include <sys/types.h>
>>>>> +#include <sys/stat.h>
>>>>>  #include <unistd.h>
>>>>>  #include <pthread.h>
>>>>>  #include <signal.h>
>>>>> @@ -64,7 +67,8 @@ int get_subbuffer(struct buffer_info *buf)
>>>>>               retval = -1;
>>>>>               goto end;
>>>>>       }
>>>>> -     result = ustcomm_send_request(buf->conn, send_msg, &received_msg);
>>>>> +
>>>>> +     result = ustcomm_send_request(buf->app_sock, send_msg,
>>>>> &received_msg);
>>>>>       if((result == -1 && (errno == ECONNRESET || errno == EPIPE)) ||
>>>>> result == 0) {
>>>>>               DBG("app died while being traced");
>>>>>               retval = GET_SUBBUF_DIED;
>>>>> @@ -84,20 +88,14 @@ int get_subbuffer(struct buffer_info *buf)
>>>>>               goto end_rep;
>>>>>       }
>>>>>
>>>>> -     if(!strcmp(rep_code, "OK")) {
>>>>> +     if (!strcmp(rep_code, "OK")) {
>>>>>               DBG("got subbuffer %s", buf->name);
>>>>>               retval = GET_SUBBUF_OK;
>>>>> -     }
>>>>> -     else if(nth_token_is(received_msg, "END", 0) == 1) {
>>>>> -             retval = GET_SUBBUF_DONE;
>>>>> -             goto end_rep;
>>>>> -     }
>>>>> -     else if(!strcmp(received_msg, "NOTFOUND")) {
>>>>> +     } else if(!strcmp(received_msg, "NOTFOUND")) {
>>>>>               DBG("For buffer %s, the trace was not found. This
>>>>> likely means it was destroyed by the user.", buf->name);
>>>>>               retval = GET_SUBBUF_DIED;
>>>>>               goto end_rep;
>>>>> -     }
>>>>> -     else {
>>>>> +     } else {
>>>>>               DBG("error getting subbuffer %s", buf->name);
>>>>>               retval = -1;
>>>>>       }
>>>>> @@ -129,7 +127,7 @@ int put_subbuffer(struct buffer_info *buf)
>>>>>               retval = -1;
>>>>>               goto end;
>>>>>       }
>>>>> -     result = ustcomm_send_request(buf->conn, send_msg, &received_msg);
>>>>> +     result = ustcomm_send_request(buf->app_sock, send_msg,
>>>>> &received_msg);
>>>>>       if(result < 0 && (errno == ECONNRESET || errno == EPIPE)) {
>>>>>               retval = PUT_SUBBUF_DIED;
>>>>>               goto end;
>>>>> @@ -200,6 +198,7 @@ struct buffer_info *connect_buffer(struct
>>>>> libustd_instance *instance, pid_t pid,
>>>>>       char *received_msg;
>>>>>       int result;
>>>>>       struct shmid_ds shmds;
>>>>> +     struct ustcomm_header header;
>>>>>
>>>>>       buf = (struct buffer_info *) zmalloc(sizeof(struct buffer_info));
>>>>>       if(buf == NULL) {
>>>>> @@ -207,18 +206,12 @@ struct buffer_info *connect_buffer(struct
>>>>> libustd_instance *instance, pid_t pid,
>>>>>               return NULL;
>>>>>       }
>>>>>
>>>>> -     buf->conn = malloc(sizeof(struct ustcomm_connection));
>>>>> -     if(buf->conn == NULL) {
>>>>> -             ERR("add_buffer: insufficient memory");
>>>>> -             free(buf);
>>>>> -             return NULL;
>>>>> -     }
>>>>> -
>>>>>       buf->name = bufname;
>>>>>       buf->pid = pid;
>>>>>
>>>>> +     /* FIXME: Fix all the freeing and exit sequence from this
>>>>> functions */
>>>>>       /* connect to app */
>>>>> -     result = ustcomm_connect_app(buf->pid, buf->conn);
>>>>> +     result = ustcomm_connect_app(buf->pid, &buf->app_sock);
>>>>>       if(result) {
>>>>>               WARN("unable to connect to process, it probably died
>>>>> before we were able to connect");
>>>>>               return NULL;
>>>>> @@ -229,7 +222,7 @@ struct buffer_info *connect_buffer(struct
>>>>> libustd_instance *instance, pid_t pid,
>>>>>               ERR("connect_buffer : asprintf failed (get_pidunique)");
>>>>>               return NULL;
>>>>>       }
>>>>> -     result = ustcomm_send_request(buf->conn, send_msg, &received_msg);
>>>>> +     result = ustcomm_send_request(buf->app_sock, send_msg,
>>>>> &received_msg);
>>>>>       free(send_msg);
>>>>>       if(result == -1) {
>>>>>               ERR("problem in ustcomm_send_request(get_pidunique)");
>>>>> @@ -253,7 +246,7 @@ struct buffer_info *connect_buffer(struct
>>>>> libustd_instance *instance, pid_t pid,
>>>>>                   buf->name);
>>>>>               return NULL;
>>>>>       }
>>>>> -     result = ustcomm_send_request(buf->conn, send_msg, &received_msg);
>>>>> +     result = ustcomm_send_request(buf->app_sock, send_msg,
>>>>> &received_msg);
>>>>>       free(send_msg);
>>>>>       if(result == -1) {
>>>>>               ERR("problem in ustcomm_send_request(get_shmid)");
>>>>> @@ -277,7 +270,7 @@ struct buffer_info *connect_buffer(struct
>>>>> libustd_instance *instance, pid_t pid,
>>>>>                   buf->name);
>>>>>               return NULL;
>>>>>       }
>>>>> -     result = ustcomm_send_request(buf->conn, send_msg, &received_msg);
>>>>> +     result = ustcomm_send_request(buf->app_sock, send_msg,
>>>>> &received_msg);
>>>>>       free(send_msg);
>>>>>       if(result == -1) {
>>>>>               ERR("problem in ustcomm_send_request(g_n_subbufs)");
>>>>> @@ -301,7 +294,7 @@ struct buffer_info *connect_buffer(struct
>>>>> libustd_instance *instance, pid_t pid,
>>>>>                   buf->name);
>>>>>               return NULL;
>>>>>       }
>>>>> -     result = ustcomm_send_request(buf->conn, send_msg, &received_msg);
>>>>> +     result = ustcomm_send_request(buf->app_sock, send_msg,
>>>>> &received_msg);
>>>>>       free(send_msg);
>>>>>       if(result == -1) {
>>>>>               ERR("problem in ustcomm_send_request(get_subbuf_size)");
>>>>> @@ -342,6 +335,32 @@ struct buffer_info *connect_buffer(struct
>>>>> libustd_instance *instance, pid_t pid,
>>>>>       }
>>>>>       buf->memlen = shmds.shm_segsz;
>>>>>
>>>>> +     /* get buffer pipe fd */
>>>>> +     memset(&header, 0, sizeof(header));
>>>>> +     if (asprintf(&send_msg, "get_buffer_fd %s", buf->name) < 0) {
>>>>> +             ERR("connect_buffer : asprintf failed (get_buffer_fd %s)",
>>>>> +                 buf->name);
>>>>> +             return NULL;
>>>>> +     }
>>>>> +     header.size = strlen(send_msg) + 1;
>>>>> +     result = ustcomm_send(buf->app_sock, &header, send_msg);
>>>>> +     free(send_msg);
>>>>> +     if (result <= 0) {
>>>>> +             ERR("ustcomm_send failed.");
>>>>> +             return NULL;
>>>>> +     }
>>>>> +     result = ustcomm_recv_fd(buf->app_sock, &header, NULL,
>>>>> &buf->pipe_fd);
>>>>> +     if (result <= 0) {
>>>>> +             ERR("ustcomm_recv_fd failed");
>>>>> +             return NULL;
>>>>> +     } else {
>>>>> +             struct stat temp;
>>>>> +             fstat(buf->pipe_fd, &temp);
>>>>> +             if (!S_ISFIFO(temp.st_mode)) {
>>>>> +                     ERR("Didn't receive a fifo from the app");
>>>>> +                     return NULL;
>>>>> +             }
>>>>> +     }
>>>>>       if(instance->callbacks->on_open_buffer)
>>>>>
>>>>> instance->callbacks->on_open_buffer(instance->callbacks, buf);
>>>>>
>>>>> @@ -361,7 +380,7 @@ static void destroy_buffer(struct
>>>>> libustd_callbacks *callbacks,
>>>>>  {
>>>>>       int result;
>>>>>
>>>>> -     result = ustcomm_close_app(buf->conn);
>>>>> +     result = close(buf->app_sock);
>>>>>       if(result == -1) {
>>>>>               WARN("problem calling ustcomm_close_app");
>>>>>       }
>>>>> @@ -379,28 +398,31 @@ static void destroy_buffer(struct
>>>>> libustd_callbacks *callbacks,
>>>>>       if(callbacks->on_close_buffer)
>>>>>               callbacks->on_close_buffer(callbacks, buf);
>>>>>
>>>>> -     free(buf->conn);
>>>>>       free(buf);
>>>>>  }
>>>>>
>>>>>  int consumer_loop(struct libustd_instance *instance, struct
>>>>> buffer_info *buf)
>>>>>  {
>>>>> -     int result;
>>>>> +     int result, read_result;
>>>>> +     char read_buf;
>>>>>
>>>>>       pthread_cleanup_push(decrement_active_buffers, instance);
>>>>>
>>>>>       for(;;) {
>>>>> +             read_result = read(buf->pipe_fd, &read_buf, 1);
>>>>>               /* get the subbuffer */
>>>>> -             result = get_subbuffer(buf);
>>>>> -             if(result == -1) {
>>>>> -                     ERR("error getting subbuffer");
>>>>> -                     continue;
>>>>> -             }
>>>>> -             else if(result == GET_SUBBUF_DONE) {
>>>>> -                     /* this is done */
>>>>> -                     break;
>>>>> -             }
>>>>> -             else if(result == GET_SUBBUF_DIED) {
>>>>> +             if (read_result == 1) {
>>>>> +                     result = get_subbuffer(buf);
>>>>> +                     if(result == -1) {
>>>>> +                             ERR("error getting subbuffer");
>>>>> +                             continue;
>>>>> +                     } else if (result == GET_SUBBUF_DIED) {
>>>>> +
>>>>> finish_consuming_dead_subbuffer(instance->callbacks, buf);
>>>>> +                             break;
>>>>> +                     }
>>>>> +             } else if ((read_result == -1 && (errno == ECONNRESET
>>>>> || errno == EPIPE)) ||
>>>>> +                        result == 0) {
>>>>> +                     DBG("App died while being traced");
>>>>>
>>>>> finish_consuming_dead_subbuffer(instance->callbacks, buf);
>>>>>                       break;
>>>>>               }
>>>>> @@ -541,64 +563,86 @@ int start_consuming_buffer(
>>>>>
>>>>>       return 0;
>>>>>  }
>>>>> +static void process_client_cmd(char *recvbuf, struct
>>>>> libustd_instance *instance)
>>>>> +{
>>>>> +     if(!strncmp(recvbuf, "collect", 7)) {
>>>>> +             pid_t pid;
>>>>> +             char *bufname;
>>>>> +             int result;
>>>>> +
>>>>> +             result = sscanf(recvbuf, "%*s %d %50as", &pid, &bufname);
>>>>> +             if (result != 2) {
>>>>> +                     ERR("parsing error: %s", recvbuf);
>>>>> +                     goto free_bufname;
>>>>> +             }
>>>>> +
>>>>> +             result = start_consuming_buffer(instance, pid, bufname);
>>>>> +             if (result < 0) {
>>>>> +                     ERR("error in add_buffer");
>>>>> +                     goto free_bufname;
>>>>> +             }
>>>>> +
>>>>> +     free_bufname:
>>>>> +             free(bufname);
>>>>> +     } else if(!strncmp(recvbuf, "exit", 4)) {
>>>>> +             /* Only there to force poll to return */
>>>>> +     } else {
>>>>> +             WARN("unknown command: %s", recvbuf);
>>>>> +     }
>>>>> +}
>>>>> +
>>>>> +#define MAX_EVENTS 10
>>>>>
>>>>>  int libustd_start_instance(struct libustd_instance *instance)
>>>>>  {
>>>>> -     int result;
>>>>> -     int timeout = -1;
>>>>> +     struct ustcomm_sock *epoll_sock;
>>>>> +     struct epoll_event events[MAX_EVENTS];
>>>>> +     struct sockaddr addr;
>>>>> +     int result, epoll_fd, accept_fd, nfds, i, addr_size, timeout;
>>>>>
>>>>>       if(!instance->is_init) {
>>>>>               ERR("libustd instance not initialized");
>>>>>               return 1;
>>>>>       }
>>>>> +     epoll_fd = instance->epoll_fd;
>>>>> +
>>>>> +     timeout = -1;
>>>>>
>>>>>       /* app loop */
>>>>>       for(;;) {
>>>>> -             char *recvbuf;
>>>>> -
>>>>> -             /* check for requests on our public socket */
>>>>> -             result = ustcomm_ustd_recv_message(instance->comm,
>>>>> &recvbuf, NULL, timeout);
>>>>> -             if(result == -1 && errno == EINTR) {
>>>>> +             nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, timeout);
>>>>> +             if (nfds == -1 && errno == EINTR) {
>>>>>                       /* Caught signal */
>>>>> +             } else if (nfds == -1) {
>>>>> +                     ERR("epoll_wait");
>>>>>               }
>>>>> -             else if(result == -1) {
>>>>> -                     ERR("error in ustcomm_ustd_recv_message");
>>>>> -                     goto loop_end;
>>>>> -             }
>>>>> -             else if(result > 0) {
>>>>> -                     if(!strncmp(recvbuf, "collect", 7)) {
>>>>> -                             pid_t pid;
>>>>> -                             char *bufname;
>>>>> -                             int result;
>>>>> -
>>>>> -                             result = sscanf(recvbuf, "%*s %d
>>>>> %50as", &pid, &bufname);
>>>>> -                             if(result != 2) {
>>>>> -                                     ERR("parsing error: %s", recvbuf);
>>>>> -                                     goto free_bufname;
>>>>> -                             }
>>>>>
>>>>> -                             result =
>>>>> start_consuming_buffer(instance, pid, bufname);
>>>>> -                             if(result < 0) {
>>>>> -                                     ERR("error in add_buffer");
>>>>> -                                     goto free_bufname;
>>>>> +             for (i = 0; i < nfds; ++i) {
>>>>> +                     epoll_sock = (struct ustcomm_sock
>>>>> *)events[i].data.ptr;
>>>>> +                     if (epoll_sock == instance->listen_sock) {
>>>>> +                             addr_size = sizeof(struct sockaddr);
>>>>> +                             accept_fd = accept(epoll_sock->fd,
>>>>> +                                                &addr,
>>>>> +                                                (socklen_t
>>>>> *)&addr_size);
>>>>> +                             if (accept_fd == -1) {
>>>>> +                                     ERR("accept failed\n");
>>>>> +                             }
>>>>> +                             ustcomm_init_sock(accept_fd, epoll_fd,
>>>>> +                                              &instance->connections);
>>>>> +                     } else {
>>>>> +                             char *msg = NULL;
>>>>> +                             result =
>>>>> recv_message_conn(epoll_sock->fd, &msg);
>>>>> +                             if (result == 0) {
>>>>> +                                     ustcomm_del_sock(epoll_sock, 0);
>>>>> +                             } else if (msg) {
>>>>> +                                     process_client_cmd(msg, instance);
>>>>> +                                     free(msg);
>>>>>                               }
>>>>>
>>>>> -                             free_bufname:
>>>>> -                             free(bufname);
>>>>> -                     }
>>>>> -                     else if(!strncmp(recvbuf, "exit", 4)) {
>>>>> -                             /* Only there to force poll to return */
>>>>> -                     }
>>>>> -                     else {
>>>>> -                             WARN("unknown command: %s", recvbuf);
>>>>>                       }
>>>>> -
>>>>> -                     free(recvbuf);
>>>>>               }
>>>>>
>>>>> -             loop_end:
>>>>> -
>>>>> -             if(instance->quit_program) {
>>>>> +             if (instance->quit_program) {
>>>>>                       pthread_mutex_lock(&instance->mutex);
>>>>>                       if(instance->active_buffers == 0) {
>>>>>                               pthread_mutex_unlock(&instance->mutex);
>>>>> @@ -617,14 +661,16 @@ int libustd_start_instance(struct
>>>>> libustd_instance *instance)
>>>>>       return 0;
>>>>>  }
>>>>>
>>>>> +/* FIXME: threads and connections !? */
>>>>>  void libustd_delete_instance(struct libustd_instance *instance)
>>>>>  {
>>>>> -     if(instance->is_init)
>>>>> -             ustcomm_fini_ustd(instance->comm);
>>>>> +     if (instance->is_init) {
>>>>> +             ustcomm_del_named_sock(instance->listen_sock, 0);
>>>>> +             close(instance->epoll_fd);
>>>>> +     }
>>>>>
>>>>>       pthread_mutex_destroy(&instance->mutex);
>>>>>       free(instance->sock_path);
>>>>> -     free(instance->comm);
>>>>>       free(instance);
>>>>>  }
>>>>>
>>>>> @@ -669,17 +715,13 @@ int libustd_stop_instance(struct
>>>>> libustd_instance *instance, int send_msg)
>>>>>       return 0;
>>>>>  }
>>>>>
>>>>> -struct libustd_instance *libustd_new_instance(
>>>>> -     struct libustd_callbacks *callbacks, char *sock_path)
>>>>> +struct libustd_instance
>>>>> +*libustd_new_instance(struct libustd_callbacks *callbacks,
>>>>> +                   char *sock_path)
>>>>>  {
>>>>>       struct libustd_instance *instance =
>>>>>               zmalloc(sizeof(struct libustd_instance));
>>>>> -     if(!instance)
>>>>> -             return NULL;
>>>>> -
>>>>> -     instance->comm = malloc(sizeof(struct ustcomm_ustd));
>>>>> -     if(!instance->comm) {
>>>>> -             free(instance);
>>>>> +     if(!instance) {
>>>>>               return NULL;
>>>>>       }
>>>>>
>>>>> @@ -689,18 +731,75 @@ struct libustd_instance *libustd_new_instance(
>>>>>       instance->active_buffers = 0;
>>>>>       pthread_mutex_init(&instance->mutex, NULL);
>>>>>
>>>>> -     if(sock_path)
>>>>> +     if (sock_path) {
>>>>>               instance->sock_path = strdup(sock_path);
>>>>> -     else
>>>>> +     } else {
>>>>>               instance->sock_path = NULL;
>>>>> +     }
>>>>>
>>>>>       return instance;
>>>>>  }
>>>>>
>>>>> +static int init_ustd_socket(struct libustd_instance *instance)
>>>>> +{
>>>>> +     char *name;
>>>>> +
>>>>> +     if (instance->sock_path) {
>>>>> +             if (asprintf(&name, "%s", instance->sock_path) < 0) {
>>>>> +                     ERR("ustcomm_init_ustd : asprintf failed
>>>>> (sock_path %s)",
>>>>> +                         instance->sock_path);
>>>>> +                     return -1;
>>>>> +             }
>>>>> +     } else {
>>>>> +             int result;
>>>>> +
>>>>> +             /* Only check if socket dir exists if we are using the
>>>>> default directory */
>>>>> +             result = ensure_dir_exists(SOCK_DIR);
>>>>> +             if (result == -1) {
>>>>> +                     ERR("Unable to create socket directory %s",
>>>>> SOCK_DIR);
>>>>> +                     return -1;
>>>>> +             }
>>>>> +
>>>>> +             if (asprintf(&name, "%s/%s", SOCK_DIR, "ustd") < 0) {
>>>>> +                     ERR("ustcomm_init_ustd : asprintf failed
>>>>> (%s/ustd)",
>>>>> +                         SOCK_DIR);
>>>>> +                     return -1;
>>>>> +             }
>>>>> +     }
>>>>> +
>>>>> +     /* Set up epoll */
>>>>> +     instance->epoll_fd = epoll_create(MAX_EVENTS);
>>>>> +     if (instance->epoll_fd == -1) {
>>>>> +             ERR("epoll_create failed, start instance bailing");
>>>>> +             goto free_name;
>>>>> +     }
>>>>> +
>>>>> +     /* Create the named socket */
>>>>> +     instance->listen_sock = ustcomm_init_named_socket(name,
>>>>> +
>>>>> instance->epoll_fd);
>>>>> +     if(!instance->listen_sock) {
>>>>> +             ERR("error initializing named socket at %s", name);
>>>>> +             goto close_epoll;
>>>>> +     }
>>>>> +
>>>>> +     INIT_LIST_HEAD(&instance->connections);
>>>>> +
>>>>> +     free(name);
>>>>> +
>>>>> +     return 0;
>>>>> +
>>>>> +close_epoll:
>>>>> +     close(instance->epoll_fd);
>>>>> +free_name:
>>>>> +     free(name);
>>>>> +
>>>>> +     return -1;
>>>>> +}
>>>>> +
>>>>>  int libustd_init_instance(struct libustd_instance *instance)
>>>>>  {
>>>>>       int result;
>>>>> -     result = ustcomm_init_ustd(instance->comm, instance->sock_path);
>>>>> +     result = init_ustd_socket(instance);
>>>>>       if(result == -1) {
>>>>>               ERR("failed to initialize socket");
>>>>>               return 1;
>>
>>>
>
> - --
> David Goulet
> LTTng project, DORSAL Lab.
>
> 1024D/16BD8563
> BE3C 672B 9331 9796 291A  14C6 4AF7 C14B 16BD 8563
> -----BEGIN PGP SIGNATURE-----
> Version: GnuPG v1.4.10 (GNU/Linux)
>
> iEYEARECAAYFAkycmMMACgkQSvfBSxa9hWPs+ACdGNYhom4kQdVkED44tJqh6Mcv
> VkUAn0o0e3RXloPBFMm4cHngoOZWb5za
> =Zuta
> -----END PGP SIGNATURE-----
>




More information about the lttng-dev mailing list