[ltt-dev] [UST PATCH] Re-write ustcomm parts of UST
Nils Carlson
nils.carlson at ericsson.com
Thu Sep 23 06:13:40 EDT 2010
This is a very big patch, and so it requires a bit of explaining.
This patch is a step on the way of accomplishing serveral goals I have in this
area:
1. Use enums for commands and eliminate text-based commands. This does not mean
that we will stop processing strings for trace/channel and marker names;
just that the long series of if statements with token and string matching
will be replaced with a switch statement. To this end I have created a
ustcomm_header struct that contains the length of the data-field and some
other fields. This allows us to first receive the header, allocate memory
for the data and then receive the data; eliminating all scanning of messages.
2. Reduce the complexity of the implementation. To put it simply, I don't like
callbacks. They reduce transparency and make it difficult to follow the
flow of the code; so I have eliminated multipoll replacing it with a normal
epoll. I have also replaced almost all the different server, connection and
source structs with one, called ustcomm_sock.
3. Make ustd scale better. Currently ustd scales terribly. We allocate one
thread per-cpu per-channel per-process, five applications each with three
channels on a four cpu machine leads to 5*3*4=60 threads. Part of the reason
for this multitude of threads was that we used a ustcomm_request call
(consisting of a send and a receive) to wait for a subbuffer to be written.
The sequence for a subbuffer to be written was as follows:
Ustd calls send with a 'get_subbuffer' command, and then recv in one of
the threads and hangs on the recv on the socket.
Upon filling the subbuffer the traced app writes '1' to a pipe.
The ust_thread inside the app which was listening to the other end of the
pipe wakes up when the '1' is written. The callback from multipoll calls
a send which sends a reply to the ustd thread over the socket.
The ustd thread wakes up and reads the message, continuing along in its
execution.
I replace this with a bit of a different mechanism, which should allow us
to eventually reduce the number of threads to one per cpu:
Ustd requests a buffer_fd which causes the ustd_thread inside the app
to send the file-descriptor for the read en of the pipe to ustd.
The ustd thread now does a read on the pipe, halting its execution until
the app fills the subbuffer and writes '1' to the pipe, waking up the ustd
thread.
Ustd now makes the 'get_subbuffer' call which the ust_thread inside the
app responds to with information about the subbuffer. Writes it and then
goes back to the read call, hanging on the pipe.
So we are still stuck on the multitude of threads, but we are in much better
position to move forward. Replacing the read with an epoll statement and then
pointing the epoll event data at the buffer struct containing the current
buffer to whitch the pipe belongs should be relatively easy. We can then
instead of spawning a new thread for each buffer just allocate the
buffer_info struct and assign it to one of the per-cpu threads in ustd to
poll on.
4. Replace poll with epoll which scales better, especially for
events << (nr of fds). This is complete.
5. Allow UST to handle arbitrarily long unix socket names. This is done by
carefull allocation of the socketaddr_un struct with a dynamic length.
Truncating is ugly and dangerous.
There is a lot of work still left to be done. This is only the first of a
number of patches that I expect in this area. If someone feels like tackling
ustd head on to reduce the number of threads that would be great.
I have kept Pierre-Marc's form of error handling for the I/O wrapping functions
because I want to propagate return codes up to the apps that are using them
so they can close file-descriptors and free associated resources. If somebody
knows of a better approach please make yourself heard.
Signed-off-by: Nils Carlson <nils.carlson at ericsson.com>
---
include/ust/ustd.h | 12 +-
libust/buffers.h | 5 +
libust/tracectl.c | 474 ++++++++++++++----------
libustcmd/ustcmd.c | 15 +-
libustcomm/Makefile.am | 5 +-
libustcomm/multipoll.c | 130 -------
libustcomm/multipoll.h | 44 ---
libustcomm/ustcomm.c | 969 +++++++++++++++++++-----------------------------
libustcomm/ustcomm.h | 83 ++---
libustd/libustd.c | 277 ++++++++++-----
10 files changed, 902 insertions(+), 1112 deletions(-)
delete mode 100644 libustcomm/multipoll.c
delete mode 100644 libustcomm/multipoll.h
diff --git a/include/ust/ustd.h b/include/ust/ustd.h
index 5fec7f9..7ce063f 100644
--- a/include/ust/ustd.h
+++ b/include/ust/ustd.h
@@ -29,16 +29,18 @@
#include <pthread.h>
#include <dirent.h>
#include <ust/kcompat/kcompat.h>
+#include <urcu/list.h>
#define USTD_DEFAULT_TRACE_PATH "/tmp/usttrace"
-struct ustcomm_connection;
-struct ustcomm_ustd;
+struct ustcomm_sock;
struct buffer_info {
const char *name;
pid_t pid;
- struct ustcomm_connection *conn;
+ int app_sock;
+ /* The pipe file descriptor */
+ int pipe_fd;
int shmid;
int bufstruct_shmid;
@@ -73,7 +75,9 @@ struct libustd_instance {
struct libustd_callbacks *callbacks;
int quit_program;
int is_init;
- struct ustcomm_ustd *comm;
+ struct list_head connections;
+ int epoll_fd;
+ struct ustcomm_sock *listen_sock;
char *sock_path;
pthread_mutex_t mutex;
int active_buffers;
diff --git a/libust/buffers.h b/libust/buffers.h
index 3044500..a2ad83e 100644
--- a/libust/buffers.h
+++ b/libust/buffers.h
@@ -82,6 +82,11 @@ struct ust_buffer {
int data_ready_fd_write;
/* the reading end of the pipe */
int data_ready_fd_read;
+ /*
+ * List of buffers with an open pipe, used for fork and forced subbuffer
+ * switch.
+ */
+ struct list_head open_buffers_list;
unsigned int finalized;
//ust// struct timer_list switch_timer; /* timer for periodical switch */
diff --git a/libust/tracectl.c b/libust/tracectl.c
index 60c375b..3472ca9 100644
--- a/libust/tracectl.c
+++ b/libust/tracectl.c
@@ -26,13 +26,15 @@
#include <stdint.h>
#include <pthread.h>
#include <signal.h>
+#include <sys/epoll.h>
+#include <sys/time.h>
#include <sys/types.h>
#include <sys/socket.h>
-#include <sys/un.h>
#include <fcntl.h>
#include <poll.h>
#include <regex.h>
#include <urcu/uatomic_arch.h>
+#include <urcu/list.h>
#include <ust/marker.h>
#include <ust/tracepoint.h>
@@ -42,7 +44,6 @@
#include "ustcomm.h"
#include "buffers.h"
#include "marker-control.h"
-#include "multipoll.h"
#define USTSIGNAL SIGIO
@@ -55,50 +56,18 @@
*/
s64 pidunique = -1LL;
-extern struct chan_info_struct chan_infos[];
+static int epoll_fd;
+static struct ustcomm_sock *listen_sock;
-struct list_head blocked_consumers = LIST_HEAD_INIT(blocked_consumers);
+extern struct chan_info_struct chan_infos[];
-static struct ustcomm_app ustcomm_app;
+static struct list_head open_buffers_list = LIST_HEAD_INIT(open_buffers_list);
-struct tracecmd { /* no padding */
- uint32_t size;
- uint16_t command;
-};
+static struct list_head ust_socks = LIST_HEAD_INIT(ust_socks);
/* volatile because shared between the listener and the main thread */
int buffers_to_export = 0;
-struct trctl_msg {
- /* size: the size of all the fields except size itself */
- uint32_t size;
- uint16_t type;
- /* Only the necessary part of the payload is transferred. It
- * may even be none of it.
- */
- char payload[94];
-};
-
-struct consumer_channel {
- int fd;
- struct ltt_channel_struct *chan;
-};
-
-struct blocked_consumer {
- int fd_consumer;
- int fd_producer;
- int tmp_poll_idx;
-
- /* args to ustcomm_send_reply */
- struct ustcomm_server server;
- struct ustcomm_source src;
-
- /* args to ust_buffers_get_subbuf */
- struct ust_buffer *buf;
-
- struct list_head list;
-};
-
static long long make_pidunique(void)
{
s64 retval;
@@ -122,7 +91,12 @@ static void print_markers(FILE *fp)
marker_iter_start(&iter);
while (iter.marker) {
- fprintf(fp, "marker: %s/%s %d \"%s\" %p\n", iter.marker->channel, iter.marker->name, (int)imv_read(iter.marker->state), iter.marker->format, iter.marker->location);
+ fprintf(fp, "marker: %s/%s %d \"%s\" %p\n",
+ iter.marker->channel,
+ iter.marker->name,
+ (int)imv_read(iter.marker->state),
+ iter.marker->format,
+ iter.marker->location);
marker_iter_next(&iter);
}
unlock_markers();
@@ -143,8 +117,6 @@ static void print_trace_events(FILE *fp)
unlock_trace_events();
}
-static int init_socket(void);
-
/* Ask the daemon to collect a trace called trace_name and being
* produced by this pid.
*
@@ -179,7 +151,8 @@ static void inform_consumer_daemon(const char *trace_name)
}
result = ustcomm_request_consumer(pid, buf);
if (result == -1) {
- WARN("Failed to request collection for channel %s. Is the daemon available?", trace->channels[i].channel_name);
+ WARN("Failed to request collection for channel %s. Is the daemon available?",
+ trace->channels[i].channel_name);
/* continue even if fail */
}
free(buf);
@@ -192,74 +165,6 @@ static void inform_consumer_daemon(const char *trace_name)
ltt_unlock_traces();
}
-int process_blkd_consumer_act(void *priv, int fd, short events)
-{
- int result;
- long consumed_old = 0;
- char *reply;
- struct blocked_consumer *bc = (struct blocked_consumer *) priv;
- char inbuf;
-
- result = read(bc->fd_producer, &inbuf, 1);
- if (result == -1) {
- PERROR("read");
- return -1;
- }
- if (result == 0) {
- int res;
- DBG("listener: got messsage that a buffer ended");
-
- res = close(bc->fd_producer);
- if (res == -1) {
- PERROR("close");
- }
-
- list_del(&bc->list);
-
- result = ustcomm_send_reply(&bc->server, "END", &bc->src);
- if (result < 0) {
- ERR("ustcomm_send_reply failed");
- return -1;
- }
-
- return 0;
- }
-
- result = ust_buffers_get_subbuf(bc->buf, &consumed_old);
- if (result == -EAGAIN) {
- WARN("missed buffer?");
- return 0;
- } else if (result < 0) {
- ERR("ust_buffers_get_subbuf: error: %s", strerror(-result));
- }
- if (asprintf(&reply, "%s %ld", "OK", consumed_old) < 0) {
- ERR("process_blkd_consumer_act : asprintf failed (OK %ld)",
- consumed_old);
- return -1;
- }
- result = ustcomm_send_reply(&bc->server, reply, &bc->src);
- if (result < 0) {
- ERR("ustcomm_send_reply failed");
- free(reply);
- return -1;
- }
- free(reply);
-
- list_del(&bc->list);
-
- return 0;
-}
-
-void blocked_consumers_add_to_mp(struct mpentries *ent)
-{
- struct blocked_consumer *bc;
-
- list_for_each_entry(bc, &blocked_consumers, list) {
- multipoll_add(ent, bc->fd_producer, POLLIN, process_blkd_consumer_act, bc, NULL);
- }
-
-}
-
void seperate_channel_cpu(const char *channel_and_cpu, char **channel, int *cpu)
{
const char *sep;
@@ -279,7 +184,7 @@ void seperate_channel_cpu(const char *channel_and_cpu, char **channel, int *cpu)
}
}
-static int do_cmd_get_shmid(const char *recvbuf, struct ustcomm_source *src)
+static int do_cmd_get_shmid(const char *recvbuf, int sock)
{
int retval = 0;
struct ust_trace *trace;
@@ -333,7 +238,7 @@ static int do_cmd_get_shmid(const char *recvbuf, struct ustcomm_source *src)
goto free_short_chan_name;
}
- result = ustcomm_send_reply(&ustcomm_app.server, reply, src);
+ result = ustcomm_send_reply(reply, sock);
if (result) {
ERR("ustcomm_send_reply failed");
free(reply);
@@ -359,7 +264,7 @@ static int do_cmd_get_shmid(const char *recvbuf, struct ustcomm_source *src)
return retval;
}
-static int do_cmd_get_n_subbufs(const char *recvbuf, struct ustcomm_source *src)
+static int do_cmd_get_n_subbufs(const char *recvbuf, int sock)
{
int retval = 0;
struct ust_trace *trace;
@@ -411,7 +316,7 @@ static int do_cmd_get_n_subbufs(const char *recvbuf, struct ustcomm_source *src)
goto free_short_chan_name;
}
- result = ustcomm_send_reply(&ustcomm_app.server, reply, src);
+ result = ustcomm_send_reply(reply, sock);
if (result) {
ERR("ustcomm_send_reply failed");
free(reply);
@@ -435,7 +340,7 @@ static int do_cmd_get_n_subbufs(const char *recvbuf, struct ustcomm_source *src)
return retval;
}
-static int do_cmd_get_subbuf_size(const char *recvbuf, struct ustcomm_source *src)
+static int do_cmd_get_subbuf_size(const char *recvbuf, int sock)
{
int retval = 0;
struct ust_trace *trace;
@@ -487,7 +392,7 @@ static int do_cmd_get_subbuf_size(const char *recvbuf, struct ustcomm_source *sr
goto free_short_chan_name;
}
- result = ustcomm_send_reply(&ustcomm_app.server, reply, src);
+ result = ustcomm_send_reply(reply, sock);
if (result) {
ERR("ustcomm_send_reply failed");
free(reply);
@@ -524,7 +429,7 @@ static unsigned int pow2_higher_or_eq(unsigned int v)
return retval<<1;
}
-static int do_cmd_set_subbuf_size(const char *recvbuf, struct ustcomm_source *src)
+static int do_cmd_set_subbuf_size(const char *recvbuf, int sock)
{
char *channel_slash_size;
char *ch_name = NULL;
@@ -581,7 +486,7 @@ static int do_cmd_set_subbuf_size(const char *recvbuf, struct ustcomm_source *sr
return retval;
}
-static int do_cmd_set_subbuf_num(const char *recvbuf, struct ustcomm_source *src)
+static int do_cmd_set_subbuf_num(const char *recvbuf, int sock)
{
char *channel_slash_num;
char *ch_name = NULL;
@@ -638,7 +543,102 @@ static int do_cmd_set_subbuf_num(const char *recvbuf, struct ustcomm_source *src
return retval;
}
-static int do_cmd_get_subbuffer(const char *recvbuf, struct ustcomm_source *src)
+static int do_cmd_get_subbuffer(const char *recvbuf, int sock)
+{
+ int retval = 0, found = 0;;
+ int i, ch_cpu, result;
+ long consumed_old = 0;
+ struct ust_trace *trace;
+ char trace_name[] = "auto";
+ char *channel_and_cpu;
+ char *ch_name;
+
+ DBG("get_subbuf");
+
+ channel_and_cpu = nth_token(recvbuf, 1);
+ if(channel_and_cpu == NULL) {
+ ERR("cannot parse channel");
+ retval = -1;
+ goto end;
+ }
+
+ seperate_channel_cpu(channel_and_cpu, &ch_name, &ch_cpu);
+ if(ch_cpu == -1) {
+ ERR("problem parsing channel name");
+ retval = -1;
+ goto free_short_chan_name;
+ }
+
+ ltt_lock_traces();
+ trace = _ltt_trace_find(trace_name);
+
+ if(trace == NULL) {
+ int result;
+
+ DBG("Cannot find trace. It was likely destroyed by the user.");
+ result = ustcomm_send_reply("NOTFOUND", sock);
+ if(result) {
+ ERR("ustcomm_send_reply failed");
+ retval = -1;
+ goto unlock_traces;
+ }
+
+ goto unlock_traces;
+ }
+
+ for(i=0; i<trace->nr_channels; i++) {
+ struct ust_channel *channel = &trace->channels[i];
+
+ if(!strcmp(trace->channels[i].channel_name, ch_name)) {
+ struct ust_buffer *buf = channel->buf[ch_cpu];
+ char *reply;
+
+ found = 1;
+
+ result = ust_buffers_get_subbuf(buf, &consumed_old);
+ if(result == -EAGAIN) {
+ WARN("missed buffer?");
+ return 0;
+ } else if (result < 0) {
+ ERR("ust_buffers_get_subbuf: error: %s", strerror(-result));
+ }
+ if (asprintf(&reply, "%s %ld", "OK", consumed_old) < 0) {
+ ERR("process_blkd_consumer_act : asprintf failed (OK %ld)",
+ consumed_old);
+ return -1;
+ }
+ result = ustcomm_send_reply(reply, sock);
+ if (result < 0) {
+ ERR("ustcomm_send_reply failed");
+ free(reply);
+ return -1;
+ }
+ free(reply);
+
+ break;
+ }
+ }
+ if(found == 0) {
+ result = ustcomm_send_reply("NOTFOUND", sock);
+ if (result <= 0) {
+ ERR("ustcomm_send_reply failed");
+ return -1;
+ }
+ ERR("unable to find channel");
+ }
+
+ unlock_traces:
+ ltt_unlock_traces();
+
+ free_short_chan_name:
+ free(ch_name);
+
+ end:
+ return retval;
+}
+
+
+static int do_cmd_get_buffer_fd(const char *recvbuf, int sock)
{
int retval = 0;
struct ust_trace *trace;
@@ -648,8 +648,9 @@ static int do_cmd_get_subbuffer(const char *recvbuf, struct ustcomm_source *src)
int found = 0;
char *ch_name;
int ch_cpu;
+ struct ustcomm_header header;
- DBG("get_subbuf");
+ DBG("get_buffer_fd");
channel_and_cpu = nth_token(recvbuf, 1);
if (channel_and_cpu == NULL) {
@@ -672,7 +673,7 @@ static int do_cmd_get_subbuffer(const char *recvbuf, struct ustcomm_source *src)
int result;
DBG("Cannot find trace. It was likely destroyed by the user.");
- result = ustcomm_send_reply(&ustcomm_app.server, "NOTFOUND", src);
+ result = ustcomm_send_reply("NOTFOUND", sock);
if (result) {
ERR("ustcomm_send_reply failed");
retval = -1;
@@ -687,22 +688,16 @@ static int do_cmd_get_subbuffer(const char *recvbuf, struct ustcomm_source *src)
if (!strcmp(trace->channels[i].channel_name, ch_name)) {
struct ust_buffer *buf = channel->buf[ch_cpu];
- struct blocked_consumer *bc;
found = 1;
- bc = (struct blocked_consumer *) zmalloc(sizeof(struct blocked_consumer));
- if (bc == NULL) {
- ERR("zmalloc returned NULL");
+ header.size = 0;
+ header.fd_included = 1;
+ if (ustcomm_send_fd(sock, &header, NULL,
+ &buf->data_ready_fd_read) <= 0) {
+ ERR("ustcomm_send_fd failed\n");
goto unlock_traces;
}
- bc->fd_consumer = src->fd;
- bc->fd_producer = buf->data_ready_fd_read;
- bc->buf = buf;
- bc->src = *src;
- bc->server = ustcomm_app.server;
-
- list_add(&bc->list, &blocked_consumers);
/* Being here is the proof the daemon has mapped the buffer in its
* memory. We may now decrement buffers_to_export.
@@ -712,6 +707,10 @@ static int do_cmd_get_subbuffer(const char *recvbuf, struct ustcomm_source *src)
STORE_SHARED(buffers_to_export, LOAD_SHARED(buffers_to_export)-1);
}
+ /* The buffer has been exported, ergo, we can add it to the
+ * list of open buffers
+ */
+ list_add(&buf->open_buffers_list, &open_buffers_list);
break;
}
}
@@ -729,7 +728,7 @@ static int do_cmd_get_subbuffer(const char *recvbuf, struct ustcomm_source *src)
return retval;
}
-static int do_cmd_put_subbuffer(const char *recvbuf, struct ustcomm_source *src)
+static int do_cmd_put_subbuffer(const char *recvbuf, int sock)
{
int retval = 0;
struct ust_trace *trace;
@@ -779,7 +778,7 @@ static int do_cmd_put_subbuffer(const char *recvbuf, struct ustcomm_source *src)
if (trace == NULL) {
DBG("Cannot find trace. It was likely destroyed by the user.");
- result = ustcomm_send_reply(&ustcomm_app.server, "NOTFOUND", src);
+ result = ustcomm_send_reply("NOTFOUND", sock);
if (result) {
ERR("ustcomm_send_reply failed");
retval = -1;
@@ -814,7 +813,7 @@ static int do_cmd_put_subbuffer(const char *recvbuf, struct ustcomm_source *src)
}
}
- result = ustcomm_send_reply(&ustcomm_app.server, reply, src);
+ result = ustcomm_send_reply(reply, sock);
if (result) {
ERR("ustcomm_send_reply failed");
free(reply);
@@ -845,26 +844,26 @@ static int do_cmd_put_subbuffer(const char *recvbuf, struct ustcomm_source *src)
static void listener_cleanup(void *ptr)
{
- ustcomm_fini_app(&ustcomm_app, 0);
+ ustcomm_del_named_sock(listen_sock, 0);
}
static void do_cmd_force_switch()
{
- struct blocked_consumer *bc;
+ struct ust_buffer *buf;
- list_for_each_entry(bc, &blocked_consumers, list) {
- ltt_force_switch(bc->buf, FORCE_FLUSH);
+ list_for_each_entry(buf, &open_buffers_list,
+ open_buffers_list) {
+ ltt_force_switch(buf, FORCE_FLUSH);
}
}
-int process_client_cmd(char *recvbuf, struct ustcomm_source *src)
+static int process_client_cmd(char *recvbuf, int sock)
{
int result;
char trace_name[] = "auto";
char trace_type[] = "ustrelay";
int len;
- DBG("received a message! it's: %s", recvbuf);
len = strlen(recvbuf);
if (!strcmp(recvbuf, "print_markers")) {
@@ -878,7 +877,7 @@ int process_client_cmd(char *recvbuf, struct ustcomm_source *src)
print_markers(fp);
fclose(fp);
- result = ustcomm_send_reply(&ustcomm_app.server, ptr, src);
+ result = ustcomm_send_reply(ptr, sock);
free(ptr);
} else if (!strcmp(recvbuf, "print_trace_events")) {
@@ -897,7 +896,7 @@ int process_client_cmd(char *recvbuf, struct ustcomm_source *src)
print_trace_events(fp);
fclose(fp);
- result = ustcomm_send_reply(&ustcomm_app.server, ptr, src);
+ result = ustcomm_send_reply(ptr, sock);
if (result < 0) {
ERR("list_trace_events failed");
return -1;
@@ -1002,11 +1001,11 @@ int process_client_cmd(char *recvbuf, struct ustcomm_source *src)
return -1;
}
} else if (nth_token_is(recvbuf, "get_shmid", 0) == 1) {
- do_cmd_get_shmid(recvbuf, src);
+ do_cmd_get_shmid(recvbuf, sock);
} else if (nth_token_is(recvbuf, "get_n_subbufs", 0) == 1) {
- do_cmd_get_n_subbufs(recvbuf, src);
+ do_cmd_get_n_subbufs(recvbuf, sock);
} else if (nth_token_is(recvbuf, "get_subbuf_size", 0) == 1) {
- do_cmd_get_subbuf_size(recvbuf, src);
+ do_cmd_get_subbuf_size(recvbuf, sock);
} else if (nth_token_is(recvbuf, "load_probe_lib", 0) == 1) {
char *libfile;
@@ -1016,13 +1015,17 @@ int process_client_cmd(char *recvbuf, struct ustcomm_source *src)
free(libfile);
} else if (nth_token_is(recvbuf, "get_subbuffer", 0) == 1) {
- do_cmd_get_subbuffer(recvbuf, src);
- } else if (nth_token_is(recvbuf, "put_subbuffer", 0) == 1) {
- do_cmd_put_subbuffer(recvbuf, src);
+ do_cmd_get_subbuffer(recvbuf, sock);
+ }
+ else if(nth_token_is(recvbuf, "get_buffer_fd", 0) == 1) {
+ do_cmd_get_buffer_fd(recvbuf, sock);
+ }
+ else if(nth_token_is(recvbuf, "put_subbuffer", 0) == 1) {
+ do_cmd_put_subbuffer(recvbuf, sock);
} else if (nth_token_is(recvbuf, "set_subbuf_size", 0) == 1) {
- do_cmd_set_subbuf_size(recvbuf, src);
+ do_cmd_set_subbuf_size(recvbuf, sock);
} else if (nth_token_is(recvbuf, "set_subbuf_num", 0) == 1) {
- do_cmd_set_subbuf_num(recvbuf, src);
+ do_cmd_set_subbuf_num(recvbuf, sock);
} else if (nth_token_is(recvbuf, "enable_marker", 0) == 1) {
char *channel_slash_name = nth_token(recvbuf, 1);
char *channel_name = NULL;
@@ -1074,7 +1077,7 @@ int process_client_cmd(char *recvbuf, struct ustcomm_source *src)
goto next_cmd;
}
- result = ustcomm_send_reply(&ustcomm_app.server, reply, src);
+ result = ustcomm_send_reply(reply, sock);
if (result) {
ERR("listener: get_pidunique: ustcomm_send_reply failed");
goto next_cmd;
@@ -1089,10 +1092,10 @@ int process_client_cmd(char *recvbuf, struct ustcomm_source *src)
SOCK_DIR);
goto next_cmd;
}
- result = ustcomm_send_reply(&ustcomm_app.server, reply, src);
+ result = ustcomm_send_reply(reply, sock);
free(reply);
} else {
- result = ustcomm_send_reply(&ustcomm_app.server, reply, src);
+ result = ustcomm_send_reply(reply, sock);
}
if (result)
ERR("ustcomm_send_reply failed");
@@ -1112,28 +1115,54 @@ next_cmd:
return 0;
}
+
+
+
+#define MAX_EVENTS 10
+
+
+
void *listener_main(void *p)
{
- int result;
+ struct ustcomm_sock *epoll_sock;
+ struct epoll_event events[MAX_EVENTS];
+ struct sockaddr addr;
+ int accept_fd, nfds, result, i, addr_size;
DBG("LISTENER");
pthread_cleanup_push(listener_cleanup, NULL);
- for (;;) {
- struct mpentries mpent;
-
- multipoll_init(&mpent);
-
- blocked_consumers_add_to_mp(&mpent);
- ustcomm_mp_add_app_clients(&mpent, &ustcomm_app, process_client_cmd);
-
- result = multipoll_poll(&mpent, -1);
- if (result == -1) {
- ERR("error in multipoll_poll");
+ for(;;) {
+ nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
+ if (nfds == -1) {
+ ERR("epoll_wait");
+ continue;
}
- multipoll_destroy(&mpent);
+ for (i = 0; i < nfds; i++) {
+ epoll_sock = (struct ustcomm_sock *)events[i].data.ptr;
+ if (epoll_sock == listen_sock) {
+ addr_size = sizeof(struct sockaddr);
+ accept_fd = accept(epoll_sock->fd,
+ &addr,
+ (socklen_t *)&addr_size);
+ if (accept_fd == -1) {
+ ERR("accept failed\n");
+ }
+ ustcomm_init_sock(accept_fd, epoll_fd,
+ &ust_socks);
+ } else {
+ char *msg = NULL;
+ result = recv_message_conn(epoll_sock->fd, &msg);
+ if (result == 0) {
+ ustcomm_del_sock(epoll_sock, 0);
+ } else if (msg) {
+ process_client_cmd(msg, epoll_sock->fd);
+ free(msg);
+ }
+ }
+ }
}
pthread_cleanup_pop(1);
@@ -1183,11 +1212,6 @@ void create_listener(void)
}
}
-static int init_socket(void)
-{
- return ustcomm_init_app(getpid(), &ustcomm_app);
-}
-
#define AUTOPROBE_DISABLED 0
#define AUTOPROBE_ENABLE_ALL 1
#define AUTOPROBE_ENABLE_REGEX 2
@@ -1225,6 +1249,41 @@ static void auto_probe_connect(struct marker *m)
}
+static struct ustcomm_sock * init_app_socket(int epoll_fd)
+{
+ char *name;
+ int result;
+ struct ustcomm_sock *sock;
+
+ result = asprintf(&name, "%s/%d", SOCK_DIR, (int)getpid());
+ if (result < 0) {
+ ERR("string overflow allocating socket name, "
+ "UST thread bailing");
+ return NULL;
+ }
+
+ result = ensure_dir_exists(SOCK_DIR);
+ if (result == -1) {
+ ERR("Unable to create socket directory %s, UST thread bailing",
+ SOCK_DIR);
+ goto free_name;
+ }
+
+ sock = ustcomm_init_named_socket(name, epoll_fd);
+ if (!sock) {
+ ERR("Error initializing named socket (%s). Check that directory"
+ "exists and that it is writable. UST thread bailing", name);
+ goto free_name;
+ }
+
+ free(name);
+ return sock;
+
+free_name:
+ free(name);
+ return NULL;
+}
+
static void __attribute__((constructor)) init()
{
int result;
@@ -1242,9 +1301,18 @@ static void __attribute__((constructor)) init()
DBG("Tracectl constructor");
- result = init_socket();
- if (result == -1) {
- ERR("init_socket error");
+ /* Set up epoll */
+ epoll_fd = epoll_create(MAX_EVENTS);
+ if (epoll_fd == -1) {
+ ERR("epoll_create failed, tracing shutting down");
+ return;
+ }
+
+ /* Create the socket */
+ listen_sock = init_app_socket(epoll_fd);
+ if (!listen_sock) {
+ ERR("failed to create application socket,"
+ " tracing shutting down");
return;
}
@@ -1451,13 +1519,6 @@ static int trace_recording(void)
return retval;
}
-#if 0
-static int have_consumer(void)
-{
- return !list_empty(&blocked_consumers);
-}
-#endif
-
int restarting_usleep(useconds_t usecs)
{
struct timespec tv;
@@ -1545,8 +1606,8 @@ void ust_potential_exec(void)
static void ust_fork(void)
{
- struct blocked_consumer *bc;
- struct blocked_consumer *deletable_bc = NULL;
+ struct ust_buffer *buf, *buf_tmp;
+ struct ustcomm_sock *sock, *sock_tmp;
int result;
/* FIXME: technically, the locks could have been taken before the fork */
@@ -1557,26 +1618,47 @@ static void ust_fork(void)
ltt_trace_stop("auto");
ltt_trace_destroy("auto", 1);
- /* Delete all active connections */
- ustcomm_close_all_connections(&ustcomm_app.server);
+ /* Delete all active connections, but leave them in the epoll set */
+ list_for_each_entry_safe(sock, sock_tmp, &ust_socks, list) {
+ ustcomm_del_sock(sock, 1);
+ }
/* Delete all blocked consumers */
- list_for_each_entry(bc, &blocked_consumers, list) {
- result = close(bc->fd_producer);
- if (result == -1) {
+ list_for_each_entry_safe(buf, buf_tmp, &open_buffers_list,
+ open_buffers_list) {
+ result = close(buf->data_ready_fd_read);
+ if(result == -1) {
PERROR("close");
}
- free(deletable_bc);
- deletable_bc = bc;
- list_del(&bc->list);
+ result = close(buf->data_ready_fd_write);
+ if(result == -1) {
+ PERROR("close");
+ }
+ list_del(&buf->open_buffers_list);
}
- /* free app, keeping socket file */
- ustcomm_fini_app(&ustcomm_app, 1);
+ /* Clean up the listener socket and epoll, keeping the scoket file */
+ ustcomm_del_named_sock(listen_sock, 1);
+ close(epoll_fd);
+ /* Re-start the launch sequence */
STORE_SHARED(buffers_to_export, 0);
have_listener = 0;
- init_socket();
+
+ /* Set up epoll */
+ epoll_fd = epoll_create(MAX_EVENTS);
+ if (epoll_fd == -1) {
+ ERR("epoll_create failed, tracing shutting down");
+ return;
+ }
+
+ /* Create the socket */
+ listen_sock = init_app_socket(epoll_fd);
+ if (!listen_sock) {
+ ERR("failed to create application socket,"
+ " tracing shutting down");
+ return;
+ }
create_listener();
ltt_trace_setup("auto");
result = ltt_trace_set_type("auto", "ustrelay");
diff --git a/libustcmd/ustcmd.c b/libustcmd/ustcmd.c
index c512320..ac90f6c 100644
--- a/libustcmd/ustcmd.c
+++ b/libustcmd/ustcmd.c
@@ -52,7 +52,12 @@ pid_t *ustcmd_get_online_pids(void)
!!strcmp(dirent->d_name, "ustd")) {
sscanf(dirent->d_name, "%u", (unsigned int *) &ret[i]);
- if (pid_is_online(ret[i])) {
+ /* FIXME: Here we previously called pid_is_online, which
+ * always returned 1, now I replaced it with just 1.
+ * We need to figure out an intelligent way of solving
+ * this, maybe connect-disconnect.
+ */
+ if (1) {
ret_size += sizeof(pid_t);
ret = (pid_t *) realloc(ret, ret_size);
++i;
@@ -592,17 +597,17 @@ int ustcmd_force_switch(pid_t pid)
int ustcmd_send_cmd(const char *cmd, const pid_t pid, char **reply)
{
- struct ustcomm_connection conn;
+ int app_fd;
int retval;
- if (ustcomm_connect_app(pid, &conn)) {
+ if (ustcomm_connect_app(pid, &app_fd)) {
ERR("could not connect to PID %u", (unsigned int) pid);
return -1;
}
- retval = ustcomm_send_request(&conn, cmd, reply);
+ retval = ustcomm_send_request(app_fd, cmd, reply);
- ustcomm_close_app(&conn);
+ close(app_fd);
return retval;
}
diff --git a/libustcomm/Makefile.am b/libustcomm/Makefile.am
index 2672071..3ae96d5 100644
--- a/libustcomm/Makefile.am
+++ b/libustcomm/Makefile.am
@@ -4,8 +4,7 @@ AM_CFLAGS = -fno-strict-aliasing
noinst_LTLIBRARIES = libustcomm.la
libustcomm_la_SOURCES = \
ustcomm.h \
- ustcomm.c \
- multipoll.h \
- multipoll.c
+ ustcomm.c
+
libustcomm_la_LDFLAGS = -no-undefined -static
libustcomm_la_CFLAGS = -DUST_COMPONENT="libustcomm" -fPIC -fno-strict-aliasing
diff --git a/libustcomm/multipoll.c b/libustcomm/multipoll.c
deleted file mode 100644
index 80426e3..0000000
--- a/libustcomm/multipoll.c
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * multipoll.c
- *
- * Copyright (C) 2010 - Pierre-Marc Fournier (pierre-marc dot fournier at polymtl dot ca)
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
- */
-
-/* Multipoll is a framework to poll on several file descriptors and to call
- * a specific callback depending on the fd that had activity.
- */
-
-#include <poll.h>
-#include <stdlib.h>
-#include "multipoll.h"
-#include "usterr.h"
-
-#define INITIAL_N_AVAIL 16
-
-/* multipoll_init
- *
- * Initialize an mpentries struct, which is initially empty of any fd.
- */
-
-int multipoll_init(struct mpentries *ent)
-{
- ent->n_used = 0;
- ent->n_avail = INITIAL_N_AVAIL;
-
- ent->pollfds = (struct pollfd *) zmalloc(sizeof(struct pollfd) * INITIAL_N_AVAIL);
- ent->extras = (struct pollfd_extra *) zmalloc(sizeof(struct pollfd_extra) * INITIAL_N_AVAIL);
-
- return 0;
-}
-
-/* multipoll_destroy: free a struct mpentries
- */
-
-int multipoll_destroy(struct mpentries *ent)
-{
- int i;
-
- for(i=0; i<ent->n_used; i++) {
- if(ent->extras[i].destroy_priv) {
- ent->extras[i].destroy_priv(ent->extras[i].priv);
- }
- }
-
- free(ent->pollfds);
- free(ent->extras);
-
- return 0;
-}
-
-/* multipoll_add
- *
- * Add a file descriptor to be waited on in a struct mpentries.
- *
- * @ent: the struct mpentries to add an fd to
- * @fd: the fd to wait on
- * @events: a mask of the types of events to wait on, see the poll(2) man page
- * @func: the callback function to be called if there is activity on the fd
- * @priv: the private pointer to pass to func
- * @destroy_priv: a callback to destroy the priv pointer when the mpentries
- is destroyed; may be NULL
- */
-
-int multipoll_add(struct mpentries *ent, int fd, short events, int (*func)(void *priv, int fd, short events), void *priv, int (*destroy_priv)(void *))
-{
- int cur;
-
- if(ent->n_used == ent->n_avail) {
- ent->n_avail *= 2;
- ent->pollfds = (struct pollfd *) realloc(ent->pollfds, sizeof(struct pollfd) * ent->n_avail);
- ent->extras = (struct pollfd_extra *) realloc(ent->extras, sizeof(struct pollfd_extra) * ent->n_avail);
- }
-
- cur = ent->n_used;
- ent->n_used++;
-
- ent->pollfds[cur].fd = fd;
- ent->pollfds[cur].events = events;
- ent->extras[cur].func = func;
- ent->extras[cur].priv = priv;
- ent->extras[cur].destroy_priv = destroy_priv;
-
- return 0;
-}
-
-/* multipoll_poll: do the actual poll on a struct mpentries
- *
- * File descriptors should have been already added with multipoll_add().
- *
- * A struct mpentries may be reused for multiple multipoll_poll calls.
- *
- * @ent: the struct mpentries to poll on.
- * @timeout: the timeout after which to return if there was no activity.
- */
-
-int multipoll_poll(struct mpentries *ent, int timeout)
-{
- int result;
- int i;
-
- result = poll(ent->pollfds, ent->n_used, timeout);
- if(result == -1) {
- PERROR("poll");
- return -1;
- }
-
- for(i=0; i<ent->n_used; i++) {
- if(ent->pollfds[i].revents) {
- ent->extras[i].func(ent->extras[i].priv, ent->pollfds[i].fd, ent->pollfds[i].revents);
- }
- }
-
- return 0;
-}
diff --git a/libustcomm/multipoll.h b/libustcomm/multipoll.h
deleted file mode 100644
index 8a0124f..0000000
--- a/libustcomm/multipoll.h
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * multipoll.h
- *
- * Copyright (C) 2010 - Pierre-Marc Fournier (pierre-marc dot fournier at polymtl dot ca)
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
- */
-
-#ifndef UST_MULTIPOLL_H
-#define UST_MULTIPOLL_H
-
-struct pollfd_extra {
- int (*func)(void *priv, int fd, short events);
- void *priv;
-
- int (*destroy_priv)(void *priv);
-};
-
-struct mpentries {
- struct pollfd *pollfds;
- struct pollfd_extra *extras;
-
- int n_used;
- int n_avail;
-};
-
-extern int multipoll_init(struct mpentries *ent);
-extern int multipoll_add(struct mpentries *ent, int fd, short events, int (*func)(void *priv, int fd, short events), void *priv, int (*destroy_priv)(void *));
-extern int multipoll_destroy(struct mpentries *ent);
-extern int multipoll_poll(struct mpentries *ent, int timeout);
-
-#endif /* UST_MULTIPOLL_H */
diff --git a/libustcomm/ustcomm.c b/libustcomm/ustcomm.c
index 567c5d1..d55ac4c 100644
--- a/libustcomm/ustcomm.c
+++ b/libustcomm/ustcomm.c
@@ -25,6 +25,7 @@
#include <sys/un.h>
#include <unistd.h>
#include <poll.h>
+#include <sys/epoll.h>
#include <sys/stat.h>
#include <stdio.h>
@@ -35,9 +36,6 @@
#include "ustcomm.h"
#include "usterr.h"
#include "share.h"
-#include "multipoll.h"
-
-#define UNIX_PATH_MAX 108
static int mkdir_p(const char *path, mode_t mode)
{
@@ -91,430 +89,450 @@ static int mkdir_p(const char *path, mode_t mode)
return retval;
}
-static int signal_process(pid_t pid)
+static struct sockaddr_un * create_sock_addr(const char *name,
+ size_t *sock_addr_size)
{
- return 0;
-}
+ struct sockaddr_un * addr;
+ size_t alloc_size;
-void ustcomm_init_connection(struct ustcomm_connection *conn)
-{
- conn->recv_buf = NULL;
- conn->recv_buf_size = 0;
- conn->recv_buf_alloc = 0;
-}
+ alloc_size = (size_t) (((struct sockaddr_un *) 0)->sun_path) +
+ strlen(name) + 1;
-int pid_is_online(pid_t pid) {
- return 1;
-}
+ addr = malloc(alloc_size);
+ if (addr < 0) {
+ ERR("allocating addr failed");
+ return NULL;
+ }
-/* Send a message
- *
- * @fd: file descriptor to send to
- * @msg: a null-terminated string containing the message to send
- *
- * Return value:
- * -1: error
- * 0: connection closed
- * 1: success
- */
+ addr->sun_family = AF_UNIX;
+ strcpy(addr->sun_path, name);
+
+ *sock_addr_size = alloc_size;
+
+ return addr;
+}
-static int send_message_fd(int fd, const char *msg)
+struct ustcomm_sock * ustcomm_init_sock(int fd, int epoll_fd,
+ struct list_head *list)
{
- int result;
+ struct epoll_event ev;
+ struct ustcomm_sock *sock;
- /* Send including the final \0 */
- result = patient_send(fd, msg, strlen(msg)+1, MSG_NOSIGNAL);
- if(result == -1) {
- if(errno != EPIPE)
- PERROR("send");
- return -1;
+ sock = malloc(sizeof(struct ustcomm_sock));
+ if (!sock) {
+ perror("malloc: couldn't allocate ustcomm_sock");
+ return NULL;
}
- else if(result == 0) {
- return 0;
+
+ ev.events = EPOLLIN;
+ ev.data.ptr = sock;
+ sock->fd = fd;
+
+ if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sock->fd, &ev) == -1) {
+ perror("epoll_ctl: failed to add socket\n");
+ free(sock);
+ return NULL;
}
- DBG("sent message \"%s\"", msg);
- return 1;
+ sock->epoll_fd = epoll_fd;
+ if (list) {
+ list_add(&sock->list, list);
+ } else {
+ INIT_LIST_HEAD(&sock->list);
+ }
+
+ return sock;
}
-/* Called by an app to ask the consumer daemon to connect to it. */
+void ustcomm_del_sock(struct ustcomm_sock *sock, int keep_in_epoll)
+{
+ list_del(&sock->list);
+ if (!keep_in_epoll) {
+ if (epoll_ctl(sock->epoll_fd, EPOLL_CTL_DEL, sock->fd, NULL) == -1) {
+ PERROR("epoll_ctl: failed to delete socket");
+ }
+ }
+ close(sock->fd);
+ free(sock);
+}
-int ustcomm_request_consumer(pid_t pid, const char *channel)
+struct ustcomm_sock * ustcomm_init_named_socket(const char *name,
+ int epoll_fd)
{
- char path[UNIX_PATH_MAX];
int result;
- char *msg=NULL;
- int retval = 0;
- struct ustcomm_connection conn;
- char *explicit_daemon_socket_path;
+ int fd;
+ size_t sock_addr_size;
+ struct sockaddr_un * addr;
+ struct ustcomm_sock *sock;
- explicit_daemon_socket_path = getenv("UST_DAEMON_SOCKET");
- if(explicit_daemon_socket_path) {
- /* user specified explicitly a socket path */
- result = snprintf(path, UNIX_PATH_MAX, "%s", explicit_daemon_socket_path);
- }
- else {
- /* just use the default path */
- result = snprintf(path, UNIX_PATH_MAX, "%s/ustd", SOCK_DIR);
+ fd = socket(PF_UNIX, SOCK_STREAM, 0);
+ if(fd == -1) {
+ PERROR("socket");
+ return NULL;
}
- if(result >= UNIX_PATH_MAX) {
- ERR("string overflow allocating socket name");
- return -1;
+ addr = create_sock_addr(name, &sock_addr_size);
+ if (addr == NULL) {
+ ERR("allocating addr, UST thread bailing");
+ goto close_sock;
}
- if (asprintf(&msg, "collect %d %s", pid, channel) < 0) {
- ERR("ustcomm_request_consumer : asprintf failed (collect %d/%s)",
- pid, channel);
- return -1;
+ result = access(name, F_OK);
+ if(result == 0) {
+ /* file exists */
+ result = unlink(name);
+ if(result == -1) {
+ PERROR("unlink of socket file");
+ goto free_addr;
+ }
+ DBG("socket already exists; overwriting");
}
- /* don't signal it because it's the daemon */
- result = ustcomm_connect_path(path, &conn, -1);
+ result = bind(fd, (struct sockaddr *)addr, sock_addr_size);
if(result == -1) {
- WARN("ustcomm_connect_path failed");
- retval = -1;
- goto del_string;
+ PERROR("bind");
+ goto free_addr;
}
- result = ustcomm_send_request(&conn, msg, NULL);
+ result = listen(fd, 1);
if(result == -1) {
- WARN("ustcomm_send_request failed");
- retval = -1;
- goto disconnect;
+ PERROR("listen");
+ goto free_addr;
}
- disconnect:
- ustcomm_disconnect(&conn);
- del_string:
- free(msg);
+ sock = ustcomm_init_sock(fd, epoll_fd,
+ NULL);
+ if (!sock) {
+ ERR("failed to create ustcomm_sock");
+ goto free_addr;
+ }
- return retval;
-}
+ free(addr);
-/* returns 1 to indicate a message was received
- * returns 0 to indicate no message was received (end of stream)
- * returns -1 to indicate an error
- */
+ return sock;
-#define RECV_INCREMENT 1000
-#define RECV_INITIAL_BUF_SIZE 10
+free_addr:
+ free(addr);
+close_sock:
+ close(fd);
-static int recv_message_fd(int fd, char **recv_buf, int *recv_buf_size, int *recv_buf_alloc, char **msg)
+ return NULL;
+}
+
+void ustcomm_del_named_sock(struct ustcomm_sock *sock,
+ int keep_socket_file)
{
- int result;
+ int result, fd;
+ struct stat st;
+ struct sockaddr dummy;
+ struct sockaddr_un *sockaddr = NULL;
+ int alloc_size;
- /* 1. Check if there is a message in the buf */
- /* 2. If not, do:
- 2.1 receive chunk and put it in buffer
- 2.2 process full message if there is one
- -- while no message arrived
- */
+ fd = sock->fd;
- for(;;) {
- int i;
- int nulfound = 0;
+ if(!keep_socket_file) {
- /* Search for full message in buffer */
- for(i=0; i<*recv_buf_size; i++) {
- if((*recv_buf)[i] == '\0') {
- nulfound = 1;
- break;
- }
+ /* Get the socket name */
+ alloc_size = sizeof(dummy);
+ if (getsockname(fd, &dummy, (socklen_t *)&alloc_size) < 0) {
+ PERROR("getsockname failed");
+ return;
}
- /* Process found message */
- if(nulfound == 1) {
- char *newbuf;
-
- if(i == 0) {
- /* problem */
- WARN("received empty message");
- }
- *msg = strndup(*recv_buf, i);
-
- /* Remove processed message from buffer */
- newbuf = (char *) malloc(*recv_buf_size - (i+1));
- memcpy(newbuf, *recv_buf + (i+1), *recv_buf_size - (i+1));
- free(*recv_buf);
- *recv_buf = newbuf;
- *recv_buf_size -= (i+1);
- *recv_buf_alloc -= (i+1);
-
- return 1;
+ sockaddr = malloc(alloc_size);
+ if (!sockaddr) {
+ ERR("failed to allocate sockaddr");
+ return;
}
- /* Receive a chunk from the fd */
- if(*recv_buf_alloc - *recv_buf_size < RECV_INCREMENT) {
- *recv_buf_alloc += RECV_INCREMENT - (*recv_buf_alloc - *recv_buf_size);
- *recv_buf = (char *) realloc(*recv_buf, *recv_buf_alloc);
+ if (getsockname(fd, sockaddr, (socklen_t *)&alloc_size) < 0) {
+ PERROR("getsockname failed");
+ goto free_sockaddr;
}
- result = recv(fd, *recv_buf+*recv_buf_size, RECV_INCREMENT, 0);
+ /* Destroy socket */
+ result = stat(sockaddr->sun_path, &st);
if(result == -1) {
- if(errno == ECONNRESET) {
- *recv_buf_size = 0;
- return 0;
- }
- else if(errno == EINTR) {
- return -1;
- }
- else {
- PERROR("recv");
- return -1;
- }
+ PERROR("stat (%s)", sockaddr->sun_path);
+ goto free_sockaddr;
}
- if(result == 0) {
- return 0;
+
+ /* Paranoid check before deleting. */
+ result = S_ISSOCK(st.st_mode);
+ if(!result) {
+ ERR("The socket we are about to delete is not a socket.");
+ goto free_sockaddr;
}
- *recv_buf_size += result;
- /* Go back to the beginning to check if there is a full message in the buffer */
+ result = unlink(sockaddr->sun_path);
+ if(result == -1) {
+ PERROR("unlink");
+ }
}
- DBG("received message \"%s\"", *recv_buf);
+ ustcomm_del_sock(sock, keep_socket_file);
- return 1;
+free_sockaddr:
+ free(sockaddr);
}
-static int recv_message_conn(struct ustcomm_connection *conn, char **msg)
-{
- return recv_message_fd(conn->fd, &conn->recv_buf, &conn->recv_buf_size, &conn->recv_buf_alloc, msg);
-}
-int ustcomm_send_reply(struct ustcomm_server *server, char *msg, struct ustcomm_source *src)
+/* Called by an app to ask the consumer daemon to connect to it. */
+
+int ustcomm_request_consumer(pid_t pid, const char *channel)
{
- int result;
+ int result, daemon_fd;
+ int retval = 0;
+ char *msg=NULL;
+ char *explicit_daemon_socket_path, *daemon_path;
- result = send_message_fd(src->fd, msg);
- if(result < 0) {
- ERR("error in send_message_fd");
+ explicit_daemon_socket_path = getenv("UST_DAEMON_SOCKET");
+ if (explicit_daemon_socket_path) {
+ /* user specified explicitly a socket path */
+ result = asprintf(&daemon_path, "%s", explicit_daemon_socket_path);
+ } else {
+ /* just use the default path */
+ result = asprintf(&daemon_path, "%s/ustd", SOCK_DIR);
+ }
+ if (result == -1) {
+ ERR("string overflow allocating socket name");
return -1;
}
- return 0;
-}
-
-/* Called after a fork. */
+ if (asprintf(&msg, "collect %d %s", pid, channel) < 0) {
+ ERR("ustcomm_request_consumer : asprintf failed (collect %d/%s)",
+ pid, channel);
+ retval = -1;
+ goto free_daemon_path;
+ }
-int ustcomm_close_all_connections(struct ustcomm_server *server)
-{
- struct ustcomm_connection *conn;
- struct ustcomm_connection *deletable_conn = NULL;
+ result = ustcomm_connect_path(daemon_path, &daemon_fd);
+ if (result == -1) {
+ WARN("ustcomm_connect_path failed, daemon_path: %s",
+ daemon_path);
+ retval = -1;
+ goto del_string;
+ }
- list_for_each_entry(conn, &server->connections, list) {
- free(deletable_conn);
- deletable_conn = conn;
- ustcomm_close_app(conn);
- list_del(&conn->list);
+ result = ustcomm_send_request(daemon_fd, msg, NULL);
+ if (result == -1) {
+ WARN("ustcomm_send_request failed, daemon path: %s",
+ daemon_path);
+ retval = -1;
}
- return 0;
+ close(daemon_fd);
+del_string:
+ free(msg);
+free_daemon_path:
+ free(daemon_path);
+
+ return retval;
}
-/* @timeout: max blocking time in milliseconds, -1 means infinity
- *
- * returns 1 to indicate a message was received
- * returns 0 to indicate no message was received
+/* returns 1 to indicate a message was received
+ * returns 0 to indicate no message was received (end of stream)
* returns -1 to indicate an error
*/
-
-int ustcomm_recv_message(struct ustcomm_server *server, char **msg, struct ustcomm_source *src, int timeout)
+int ustcomm_recv_fd(int sock,
+ struct ustcomm_header *header,
+ char **data, int *fd)
{
- struct pollfd *fds;
- struct ustcomm_connection **conn_table;
- struct ustcomm_connection *conn;
int result;
int retval;
-
- for(;;) {
- int idx = 0;
- int n_fds = 1;
-
- list_for_each_entry(conn, &server->connections, list) {
- n_fds++;
- }
-
- fds = (struct pollfd *) zmalloc(n_fds * sizeof(struct pollfd));
- if(fds == NULL) {
- ERR("zmalloc returned NULL");
+ struct ustcomm_header peek_header;
+ struct iovec iov[2];
+ struct msghdr msg;
+ struct cmsghdr *cmsg;
+ char buf[CMSG_SPACE(sizeof(int))];
+
+ result = recv(sock, &peek_header, sizeof(peek_header),
+ MSG_PEEK | MSG_WAITALL);
+ if (result <= 0) {
+ if(errno == ECONNRESET) {
+ return 0;
+ } else if (errno == EINTR) {
+ return -1;
+ } else if (result < 0) {
+ PERROR("recv");
return -1;
}
+ return 0;
+ }
- conn_table = (struct ustcomm_connection **) zmalloc(n_fds * sizeof(struct ustcomm_connection *));
- if(conn_table == NULL) {
- ERR("zmalloc returned NULL");
- retval = -1;
- goto free_fds_return;
- }
+ memset(&msg, 0, sizeof(msg));
- /* special idx 0 is for listening socket */
- fds[idx].fd = server->listen_fd;
- fds[idx].events = POLLIN;
- idx++;
+ iov[0].iov_base = (char *)header;
+ iov[0].iov_len = sizeof(struct ustcomm_header);
- list_for_each_entry(conn, &server->connections, list) {
- fds[idx].fd = conn->fd;
- fds[idx].events = POLLIN;
- conn_table[idx] = conn;
- idx++;
- }
+ msg.msg_iov = iov;
+ msg.msg_iovlen = 1;
- result = poll(fds, n_fds, timeout);
- if(result == -1 && errno == EINTR) {
- /* That's ok. ustd receives signals to notify it must shutdown. */
- retval = -1;
- goto free_conn_table_return;
- }
- else if(result == -1) {
- PERROR("poll");
- retval = -1;
- goto free_conn_table_return;
+ if (peek_header.size) {
+ if (peek_header.size < 0 || peek_header.size > 100) {
+ WARN("big peek header! %d", peek_header.size);
}
- else if(result == 0) {
- retval = 0;
- goto free_conn_table_return;
+ *data = malloc(peek_header.size);
+ if (!*data) {
+ ERR("failed to allocate space for message");
}
- if(fds[0].revents) {
- struct ustcomm_connection *newconn;
- int newfd;
+ iov[1].iov_base = (char *)*data;
+ iov[1].iov_len = peek_header.size;
- result = newfd = accept(server->listen_fd, NULL, NULL);
- if(result == -1) {
- PERROR("accept");
- retval = -1;
- goto free_conn_table_return;
- }
+ msg.msg_iovlen++;
+ }
- newconn = (struct ustcomm_connection *) zmalloc(sizeof(struct ustcomm_connection));
- if(newconn == NULL) {
- ERR("zmalloc returned NULL");
- return -1;
- }
+ if (fd && peek_header.fd_included) {
+ msg.msg_control = buf;
+ msg.msg_controllen = sizeof(buf);
+ }
- ustcomm_init_connection(newconn);
- newconn->fd = newfd;
+ result = recvmsg(sock, &msg,
+ MSG_WAITALL);
- list_add(&newconn->list, &server->connections);
+ if (result <= 0) {
+ if(errno == ECONNRESET) {
+ retval = 0;
+ } else if (errno == EINTR) {
+ retval = -1;
+ } else if (result < 0) {
+ PERROR("recv");
+ retval = -1;
+ } else {
+ retval = 0;
}
-
- for(idx=1; idx<n_fds; idx++) {
- if(fds[idx].revents) {
- retval = recv_message_conn(conn_table[idx], msg);
- if(src)
- src->fd = fds[idx].fd;
-
- if(retval == 0) {
- /* connection finished */
- list_for_each_entry(conn, &server->connections, list) {
- if(conn->fd == fds[idx].fd) {
- ustcomm_close_app(conn);
- list_del(&conn->list);
- free(conn);
- break;
- }
- }
- }
- else {
- goto free_conn_table_return;
- }
+ free(*data);
+ return retval;
+ }
+
+ if (fd && peek_header.fd_included) {
+ cmsg = CMSG_FIRSTHDR(&msg);
+ result = 0;
+ while (cmsg != NULL) {
+ if (cmsg->cmsg_level == SOL_SOCKET
+ && cmsg->cmsg_type == SCM_RIGHTS) {
+ *fd = *(int *) CMSG_DATA(cmsg);
+ result = 1;
+ break;
}
+ cmsg = CMSG_NXTHDR(&msg, cmsg);
+ }
+ if (!result) {
+ ERR("Failed to receive file descriptor\n");
}
-
- free(fds);
- free(conn_table);
}
-free_conn_table_return:
- free(conn_table);
-free_fds_return:
- free(fds);
- return retval;
+ return 1;
}
-int ustcomm_ustd_recv_message(struct ustcomm_ustd *ustd, char **msg, struct ustcomm_source *src, int timeout)
+int ustcomm_recv(int sock,
+ struct ustcomm_header *header,
+ char **data)
{
- return ustcomm_recv_message(&ustd->server, msg, src, timeout);
+ return ustcomm_recv_fd(sock, header, data, NULL);
}
-int ustcomm_app_recv_message(struct ustcomm_app *app, char **msg, struct ustcomm_source *src, int timeout)
+
+int recv_message_conn(int sock, char **msg)
{
- return ustcomm_recv_message(&app->server, msg, src, timeout);
-}
+ struct ustcomm_header header;
-/* This removes src from the list of active connections of app.
- */
+ return ustcomm_recv(sock, &header, msg);
+}
-int ustcomm_app_detach_client(struct ustcomm_app *app, struct ustcomm_source *src)
+int ustcomm_send_fd(int sock,
+ const struct ustcomm_header *header,
+ const char *data,
+ int *fd)
{
- struct ustcomm_server *server = (struct ustcomm_server *)app;
- struct ustcomm_connection *conn;
+ struct iovec iov[2];
+ struct msghdr msg;
+ int result;
+ struct cmsghdr *cmsg;
+ char buf[CMSG_SPACE(sizeof(int))];
+
+ memset(&msg, 0, sizeof(msg));
+
+ iov[0].iov_base = (char *)header;
+ iov[0].iov_len = sizeof(struct ustcomm_header);
+
+ msg.msg_iov = iov;
+ msg.msg_iovlen = 1;
+
+ if (header->size) {
+ iov[1].iov_base = (char *)data;
+ iov[1].iov_len = header->size;
+
+ msg.msg_iovlen++;
- list_for_each_entry(conn, &server->connections, list) {
- if(conn->fd == src->fd) {
- list_del(&conn->list);
- goto found;
- }
}
- return -1;
-found:
- return src->fd;
+ if (fd && header->fd_included) {
+ msg.msg_control = buf;
+ msg.msg_controllen = sizeof(buf);
+ cmsg = CMSG_FIRSTHDR(&msg);
+ cmsg->cmsg_level = SOL_SOCKET;
+ cmsg->cmsg_type = SCM_RIGHTS;
+ cmsg->cmsg_len = CMSG_LEN(sizeof(int));
+ *(int *) CMSG_DATA(cmsg) = *fd;
+ msg.msg_controllen = cmsg->cmsg_len;
+ }
+
+ result = sendmsg(sock, &msg, MSG_NOSIGNAL);
+ if (result < 0 && errno != EPIPE) {
+ PERROR("sendmsg failed");
+ }
+ return result;
}
-static int init_named_socket(const char *name, char **path_out)
+int ustcomm_send(int sock,
+ const struct ustcomm_header *header,
+ const char *data)
{
- int result;
- int fd;
+ return ustcomm_send_fd(sock, header, data, NULL);
+}
- struct sockaddr_un addr;
-
- result = fd = socket(PF_UNIX, SOCK_STREAM, 0);
- if(result == -1) {
- PERROR("socket");
- return -1;
- }
+int ustcomm_send_reply(char *msg, int sock)
+{
+ int result;
+ struct ustcomm_header header;
- addr.sun_family = AF_UNIX;
+ memset(&header, 0, sizeof(header));
- strncpy(addr.sun_path, name, UNIX_PATH_MAX);
- addr.sun_path[UNIX_PATH_MAX-1] = '\0';
+ header.size = strlen(msg) + 1;
- result = access(name, F_OK);
- if(result == 0) {
- /* file exists */
- result = unlink(name);
- if(result == -1) {
- PERROR("unlink of socket file");
- goto close_sock;
- }
- DBG("socket already exists; overwriting");
+ result = ustcomm_send(sock, &header, msg);
+ if(result < 0) {
+ ERR("error in ustcomm_send");
+ return result;
}
- result = bind(fd, (struct sockaddr *)&addr, sizeof(addr));
- if(result == -1) {
- PERROR("bind");
- goto close_sock;
- }
+ return 0;
+}
- result = listen(fd, 1);
- if(result == -1) {
- PERROR("listen");
- goto close_sock;
- }
+int ustcomm_send_req(int sock,
+ const struct ustcomm_header *req_header,
+ const char *data,
+ char **response)
+{
+ int result;
+ struct ustcomm_header res_header;
- if(path_out) {
- *path_out = strdup(addr.sun_path);
+ result = ustcomm_send(sock, req_header, data);
+ if ( result <= 0) {
+ return result;
}
- return fd;
+ if (!response) {
+ return 1;
+ }
- close_sock:
- close(fd);
+ return ustcomm_recv(sock,
+ &res_header,
+ response);
- return -1;
}
/*
@@ -527,27 +545,17 @@ static int init_named_socket(const char *name, char **path_out)
* ECONNRESET, which is normal when the application dies.
*/
-int ustcomm_send_request(struct ustcomm_connection *conn, const char *req, char **reply)
+int ustcomm_send_request(int sock, const char *req, char **reply)
{
- int result;
+ struct ustcomm_header req_header;
- /* Send including the final \0 */
- result = send_message_fd(conn->fd, req);
- if(result != 1)
- return result;
+ req_header.size = strlen(req) + 1;
- if(!reply)
- return 1;
+ return ustcomm_send_req(sock,
+ &req_header,
+ req,
+ reply);
- result = recv_message_conn(conn, reply);
- if(result == -1) {
- return -1;
- }
- else if(result == 0) {
- return 0;
- }
-
- return 1;
}
/* Return value:
@@ -555,52 +563,45 @@ int ustcomm_send_request(struct ustcomm_connection *conn, const char *req, char
* -1: error
*/
-int ustcomm_connect_path(const char *path, struct ustcomm_connection *conn, pid_t signalpid)
+int ustcomm_connect_path(const char *name, int *connection_fd)
{
- int fd;
- int result;
- struct sockaddr_un addr;
+ int result, fd;
+ size_t sock_addr_size;
+ struct sockaddr_un *addr;
- ustcomm_init_connection(conn);
-
- result = fd = socket(PF_UNIX, SOCK_STREAM, 0);
- if(result == -1) {
+ fd = socket(PF_UNIX, SOCK_STREAM, 0);
+ if(fd == -1) {
PERROR("socket");
return -1;
}
- addr.sun_family = AF_UNIX;
-
- result = snprintf(addr.sun_path, UNIX_PATH_MAX, "%s", path);
- if(result >= UNIX_PATH_MAX) {
- ERR("string overflow allocating socket name");
- return -1;
- }
-
- if(signalpid >= 0) {
- result = signal_process(signalpid);
- if(result == -1) {
- ERR("could not signal process");
- return -1;
- }
+ addr = create_sock_addr(name, &sock_addr_size);
+ if (addr == NULL) {
+ ERR("allocating addr failed");
+ goto close_sock;
}
- result = connect(fd, (struct sockaddr *)&addr, sizeof(addr));
+ result = connect(fd, (struct sockaddr *)addr, sock_addr_size);
if(result == -1) {
- PERROR("connect (path=%s)", path);
- return -1;
+ PERROR("connect (path=%s)", name);
+ goto free_sock_addr;
}
- conn->fd = fd;
+ *connection_fd = fd;
+
+ free(addr);
return 0;
-}
-int ustcomm_disconnect(struct ustcomm_connection *conn)
-{
- return close(conn->fd);
+free_sock_addr:
+ free(addr);
+close_sock:
+ close(fd);
+
+ return -1;
}
+
/* Open a connection to a traceable app.
*
* Return value:
@@ -608,35 +609,30 @@ int ustcomm_disconnect(struct ustcomm_connection *conn)
* -1: error
*/
-int ustcomm_connect_app(pid_t pid, struct ustcomm_connection *conn)
+int ustcomm_connect_app(pid_t pid, int *app_fd)
{
int result;
- char path[UNIX_PATH_MAX];
-
+ int retval = 0;
+ char *name;
- result = snprintf(path, UNIX_PATH_MAX, "%s/%d", SOCK_DIR, pid);
- if(result >= UNIX_PATH_MAX) {
- ERR("string overflow allocating socket name");
+ result = asprintf(&name, "%s/%d", SOCK_DIR, pid);
+ if (result < 0) {
+ ERR("failed to allocate socket name");
return -1;
}
- return ustcomm_connect_path(path, conn, pid);
-}
-
-/* Close a connection to a traceable app. It frees the
- * resources. It however does not free the
- * ustcomm_connection itself.
- */
+ result = ustcomm_connect_path(name, app_fd);
+ if (result < 0) {
+ ERR("failed to connect to app");
+ retval = -1;
+ }
-int ustcomm_close_app(struct ustcomm_connection *conn)
-{
- close(conn->fd);
- free(conn->recv_buf);
+ free(name);
- return 0;
+ return retval;
}
-static int ensure_dir_exists(const char *dir)
+int ensure_dir_exists(const char *dir)
{
struct stat st;
int result;
@@ -663,139 +659,10 @@ static int ensure_dir_exists(const char *dir)
return 0;
}
-/* Called by an application to initialize its server so daemons can
- * connect to it.
- */
-
-int ustcomm_init_app(pid_t pid, struct ustcomm_app *handle)
-{
- int result;
- char *name;
-
- result = asprintf(&name, "%s/%d", SOCK_DIR, (int)pid);
- if(result >= UNIX_PATH_MAX) {
- ERR("string overflow allocating socket name");
- return -1;
- }
-
- result = ensure_dir_exists(SOCK_DIR);
- if(result == -1) {
- ERR("Unable to create socket directory %s", SOCK_DIR);
- return -1;
- }
-
- handle->server.listen_fd = init_named_socket(name, &(handle->server.socketpath));
- if(handle->server.listen_fd < 0) {
- ERR("Error initializing named socket (%s). Check that directory exists and that it is writable.", name);
- goto free_name;
- }
- free(name);
-
- INIT_LIST_HEAD(&handle->server.connections);
-
- return 0;
-
-free_name:
- free(name);
- return -1;
-}
-
/* Used by the daemon to initialize its server so applications
* can connect to it.
*/
-int ustcomm_init_ustd(struct ustcomm_ustd *handle, const char *sock_path)
-{
- char *name;
- int retval = 0;
-
- if(sock_path) {
- if (asprintf(&name, "%s", sock_path) < 0) {
- ERR("ustcomm_init_ustd : asprintf failed (sock_path %s)",
- sock_path);
- return -1;
- }
- }
- else {
- int result;
-
- /* Only check if socket dir exists if we are using the default directory */
- result = ensure_dir_exists(SOCK_DIR);
- if(result == -1) {
- ERR("Unable to create socket directory %s", SOCK_DIR);
- return -1;
- }
-
- if (asprintf(&name, "%s/%s", SOCK_DIR, "ustd") < 0) {
- ERR("ustcomm_init_ustd : asprintf failed (%s/ustd)",
- SOCK_DIR);
- return -1;
- }
- }
-
- handle->server.listen_fd = init_named_socket(name, &handle->server.socketpath);
- if(handle->server.listen_fd < 0) {
- ERR("error initializing named socket at %s", name);
- retval = -1;
- goto free_name;
- }
-
- INIT_LIST_HEAD(&handle->server.connections);
-
-free_name:
- free(name);
-
- return retval;
-}
-
-static void ustcomm_fini_server(struct ustcomm_server *server, int keep_socket_file)
-{
- int result;
- struct stat st;
-
- if(!keep_socket_file) {
- /* Destroy socket */
- result = stat(server->socketpath, &st);
- if(result == -1) {
- PERROR("stat (%s)", server->socketpath);
- return;
- }
-
- /* Paranoid check before deleting. */
- result = S_ISSOCK(st.st_mode);
- if(!result) {
- ERR("The socket we are about to delete is not a socket.");
- return;
- }
-
- result = unlink(server->socketpath);
- if(result == -1) {
- PERROR("unlink");
- }
- }
-
- free(server->socketpath);
-
- result = close(server->listen_fd);
- if(result == -1) {
- PERROR("close");
- return;
- }
-}
-
-/* Free a traceable application server */
-
-void ustcomm_fini_app(struct ustcomm_app *handle, int keep_socket_file)
-{
- ustcomm_fini_server(&handle->server, keep_socket_file);
-}
-
-/* Free a ustd server */
-
-void ustcomm_fini_ustd(struct ustcomm_ustd *handle)
-{
- ustcomm_fini_server(&handle->server, 0);
-}
static const char *find_tok(const char *str)
{
@@ -884,89 +751,3 @@ char *nth_token(const char *str, int tok_no)
return retval;
}
-
-/* Callback from multipoll.
- * Receive a new connection on the listening socket.
- */
-
-static int process_mp_incoming_conn(void *priv, int fd, short events)
-{
- struct ustcomm_connection *newconn;
- struct ustcomm_server *server = (struct ustcomm_server *) priv;
- int newfd;
- int result;
-
- result = newfd = accept(server->listen_fd, NULL, NULL);
- if(result == -1) {
- PERROR("accept");
- return -1;
- }
-
- newconn = (struct ustcomm_connection *) zmalloc(sizeof(struct ustcomm_connection));
- if(newconn == NULL) {
- ERR("zmalloc returned NULL");
- return -1;
- }
-
- ustcomm_init_connection(newconn);
- newconn->fd = newfd;
-
- list_add(&newconn->list, &server->connections);
-
- return 0;
-}
-
-/* Callback from multipoll.
- * Receive a message on an existing connection.
- */
-
-static int process_mp_conn_msg(void *priv, int fd, short revents)
-{
- struct ustcomm_multipoll_conn_info *mpinfo = (struct ustcomm_multipoll_conn_info *) priv;
- int result;
- char *msg;
- struct ustcomm_source src;
-
- if(revents) {
- src.fd = fd;
-
- result = recv_message_conn(mpinfo->conn, &msg);
- if(result == -1) {
- ERR("error in recv_message_conn");
- }
-
- else if(result == 0) {
- /* connection finished */
- ustcomm_close_app(mpinfo->conn);
- list_del(&mpinfo->conn->list);
- free(mpinfo->conn);
- }
- else {
- mpinfo->cb(msg, &src);
- free(msg);
- }
- }
-
- return 0;
-}
-
-int free_ustcomm_client_poll(void *data)
-{
- free(data);
- return 0;
-}
-
-void ustcomm_mp_add_app_clients(struct mpentries *ent, struct ustcomm_app *app, int (*cb)(char *recvbuf, struct ustcomm_source *src))
-{
- struct ustcomm_connection *conn;
-
- /* add listener socket */
- multipoll_add(ent, app->server.listen_fd, POLLIN, process_mp_incoming_conn, &app->server, NULL);
-
- list_for_each_entry(conn, &app->server.connections, list) {
- struct ustcomm_multipoll_conn_info *mpinfo = (struct ustcomm_multipoll_conn_info *) zmalloc(sizeof(struct ustcomm_multipoll_conn_info));
- mpinfo->conn = conn;
- mpinfo->cb = cb;
- multipoll_add(ent, conn->fd, POLLIN, process_mp_conn_msg, mpinfo, free_ustcomm_client_poll);
- }
-}
diff --git a/libustcomm/ustcomm.h b/libustcomm/ustcomm.h
index f96ca16..f3c07b6 100644
--- a/libustcomm/ustcomm.h
+++ b/libustcomm/ustcomm.h
@@ -23,73 +23,62 @@
#include <urcu/list.h>
#include <ust/kcompat/kcompat.h>
-#include "multipoll.h"
#define SOCK_DIR "/tmp/ust-app-socks"
#define UST_SIGNAL SIGIO
-struct ustcomm_connection {
+struct ustcomm_sock {
struct list_head list;
int fd;
- /* Data that has not yet been consumed: */
- char *recv_buf;
- int recv_buf_size;
- int recv_buf_alloc;
+ int epoll_fd;
};
-/* ustcomm_server must be shallow-copyable */
-struct ustcomm_server {
- /* the "server" socket for serving the external requests */
- int listen_fd;
- char *socketpath;
-
- struct list_head connections;
-};
-
-struct ustcomm_ustd {
- struct ustcomm_server server;
+struct ustcomm_header {
+ int type;
+ long size;
+ int command;
+ int response;
+ int fd_included;
};
-struct ustcomm_app {
- struct ustcomm_server server;
-};
-/* ustcomm_source must be shallow-copyable */
-struct ustcomm_source {
- int fd;
- void *priv;
-};
+//int send_message_pid(pid_t pid, const char *msg, char **reply);
-struct ustcomm_multipoll_conn_info {
- struct ustcomm_connection *conn;
- int (*cb)(char *msg, struct ustcomm_source *src);
-};
+/* Ensure directory existence, usefull for unix sockets */
+extern int ensure_dir_exists(const char *dir);
-//int send_message_pid(pid_t pid, const char *msg, char **reply);
-extern int ustcomm_request_consumer(pid_t pid, const char *channel);
+/* Create and delete sockets */
+extern struct ustcomm_sock * ustcomm_init_sock(int fd, int epoll_fd,
+ struct list_head *list);
+extern void ustcomm_del_sock(struct ustcomm_sock *sock, int keep_in_epoll);
-extern int ustcomm_ustd_recv_message(struct ustcomm_ustd *ustd, char **msg, struct ustcomm_source *src, int timeout);
-extern int ustcomm_app_recv_message(struct ustcomm_app *app, char **msg, struct ustcomm_source *src, int timeout);
+/* Create and delete named sockets */
+extern struct ustcomm_sock * ustcomm_init_named_socket(const char *name,
+ int epoll_fd);
+extern void ustcomm_del_named_sock(struct ustcomm_sock *sock,
+ int keep_socket_file);
-extern int ustcomm_init_app(pid_t pid, struct ustcomm_app *handle);
-extern void ustcomm_fini_app(struct ustcomm_app *handle, int keep_socket_file);
-extern void ustcomm_fini_ustd(struct ustcomm_ustd *handle);
+/* Send and receive functions for file descriptors */
+extern int ustcomm_send_fd(int sock, const struct ustcomm_header *header,
+ const char *data, int *fd);
+extern int ustcomm_recv_fd(int sock, struct ustcomm_header *header,
+ char **data, int *fd);
-extern int ustcomm_init_ustd(struct ustcomm_ustd *handle, const char *sock_path);
+/* Normal send and receive functions */
+extern int ustcomm_send(int sock, const struct ustcomm_header *header,
+ const char *data);
+extern int ustcomm_recv(int sock, struct ustcomm_header *header,
+ char **data);
-extern int ustcomm_connect_app(pid_t pid, struct ustcomm_connection *conn);
-extern int ustcomm_close_app(struct ustcomm_connection *conn);
-extern int ustcomm_connect_path(const char *path, struct ustcomm_connection *conn, pid_t signalpid);
-extern int ustcomm_send_request(struct ustcomm_connection *conn, const char *req, char **reply);
-extern int ustcomm_send_reply(struct ustcomm_server *server, char *msg, struct ustcomm_source *src);
-extern int ustcomm_disconnect(struct ustcomm_connection *conn);
-extern int ustcomm_close_all_connections(struct ustcomm_server *server);
-extern void ustcomm_mp_add_app_clients(struct mpentries *ent, struct ustcomm_app *app, int (*cb)(char *recvbuf, struct ustcomm_source *src));
+extern int ustcomm_request_consumer(pid_t pid, const char *channel);
+extern int ustcomm_connect_app(pid_t pid, int *app_fd);
+extern int ustcomm_connect_path(const char *path, int *connection_fd);
+extern int ustcomm_send_request(int sock, const char *req, char **reply);
+extern int ustcomm_send_reply(char *msg, int sock);
+extern int recv_message_conn(int sock, char **msg);
extern int nth_token_is(const char *str, const char *token, int tok_no);
extern char *nth_token(const char *str, int tok_no);
-extern int pid_is_online(pid_t);
-
#endif /* USTCOMM_H */
diff --git a/libustd/libustd.c b/libustd/libustd.c
index 999e4da..cb5b123 100644
--- a/libustd/libustd.c
+++ b/libustd/libustd.c
@@ -18,7 +18,10 @@
#define _GNU_SOURCE
+#include <sys/epoll.h>
#include <sys/shm.h>
+#include <sys/types.h>
+#include <sys/stat.h>
#include <unistd.h>
#include <pthread.h>
#include <signal.h>
@@ -64,7 +67,8 @@ int get_subbuffer(struct buffer_info *buf)
retval = -1;
goto end;
}
- result = ustcomm_send_request(buf->conn, send_msg, &received_msg);
+
+ result = ustcomm_send_request(buf->app_sock, send_msg, &received_msg);
if((result == -1 && (errno == ECONNRESET || errno == EPIPE)) || result == 0) {
DBG("app died while being traced");
retval = GET_SUBBUF_DIED;
@@ -84,20 +88,14 @@ int get_subbuffer(struct buffer_info *buf)
goto end_rep;
}
- if(!strcmp(rep_code, "OK")) {
+ if (!strcmp(rep_code, "OK")) {
DBG("got subbuffer %s", buf->name);
retval = GET_SUBBUF_OK;
- }
- else if(nth_token_is(received_msg, "END", 0) == 1) {
- retval = GET_SUBBUF_DONE;
- goto end_rep;
- }
- else if(!strcmp(received_msg, "NOTFOUND")) {
+ } else if(!strcmp(received_msg, "NOTFOUND")) {
DBG("For buffer %s, the trace was not found. This likely means it was destroyed by the user.", buf->name);
retval = GET_SUBBUF_DIED;
goto end_rep;
- }
- else {
+ } else {
DBG("error getting subbuffer %s", buf->name);
retval = -1;
}
@@ -129,7 +127,7 @@ int put_subbuffer(struct buffer_info *buf)
retval = -1;
goto end;
}
- result = ustcomm_send_request(buf->conn, send_msg, &received_msg);
+ result = ustcomm_send_request(buf->app_sock, send_msg, &received_msg);
if(result < 0 && (errno == ECONNRESET || errno == EPIPE)) {
retval = PUT_SUBBUF_DIED;
goto end;
@@ -200,6 +198,7 @@ struct buffer_info *connect_buffer(struct libustd_instance *instance, pid_t pid,
char *received_msg;
int result;
struct shmid_ds shmds;
+ struct ustcomm_header header;
buf = (struct buffer_info *) zmalloc(sizeof(struct buffer_info));
if(buf == NULL) {
@@ -207,18 +206,12 @@ struct buffer_info *connect_buffer(struct libustd_instance *instance, pid_t pid,
return NULL;
}
- buf->conn = malloc(sizeof(struct ustcomm_connection));
- if(buf->conn == NULL) {
- ERR("add_buffer: insufficient memory");
- free(buf);
- return NULL;
- }
-
buf->name = bufname;
buf->pid = pid;
+ /* FIXME: Fix all the freeing and exit sequence from this functions */
/* connect to app */
- result = ustcomm_connect_app(buf->pid, buf->conn);
+ result = ustcomm_connect_app(buf->pid, &buf->app_sock);
if(result) {
WARN("unable to connect to process, it probably died before we were able to connect");
return NULL;
@@ -229,7 +222,7 @@ struct buffer_info *connect_buffer(struct libustd_instance *instance, pid_t pid,
ERR("connect_buffer : asprintf failed (get_pidunique)");
return NULL;
}
- result = ustcomm_send_request(buf->conn, send_msg, &received_msg);
+ result = ustcomm_send_request(buf->app_sock, send_msg, &received_msg);
free(send_msg);
if(result == -1) {
ERR("problem in ustcomm_send_request(get_pidunique)");
@@ -253,7 +246,7 @@ struct buffer_info *connect_buffer(struct libustd_instance *instance, pid_t pid,
buf->name);
return NULL;
}
- result = ustcomm_send_request(buf->conn, send_msg, &received_msg);
+ result = ustcomm_send_request(buf->app_sock, send_msg, &received_msg);
free(send_msg);
if(result == -1) {
ERR("problem in ustcomm_send_request(get_shmid)");
@@ -277,7 +270,7 @@ struct buffer_info *connect_buffer(struct libustd_instance *instance, pid_t pid,
buf->name);
return NULL;
}
- result = ustcomm_send_request(buf->conn, send_msg, &received_msg);
+ result = ustcomm_send_request(buf->app_sock, send_msg, &received_msg);
free(send_msg);
if(result == -1) {
ERR("problem in ustcomm_send_request(g_n_subbufs)");
@@ -301,7 +294,7 @@ struct buffer_info *connect_buffer(struct libustd_instance *instance, pid_t pid,
buf->name);
return NULL;
}
- result = ustcomm_send_request(buf->conn, send_msg, &received_msg);
+ result = ustcomm_send_request(buf->app_sock, send_msg, &received_msg);
free(send_msg);
if(result == -1) {
ERR("problem in ustcomm_send_request(get_subbuf_size)");
@@ -342,6 +335,32 @@ struct buffer_info *connect_buffer(struct libustd_instance *instance, pid_t pid,
}
buf->memlen = shmds.shm_segsz;
+ /* get buffer pipe fd */
+ memset(&header, 0, sizeof(header));
+ if (asprintf(&send_msg, "get_buffer_fd %s", buf->name) < 0) {
+ ERR("connect_buffer : asprintf failed (get_buffer_fd %s)",
+ buf->name);
+ return NULL;
+ }
+ header.size = strlen(send_msg) + 1;
+ result = ustcomm_send(buf->app_sock, &header, send_msg);
+ free(send_msg);
+ if (result <= 0) {
+ ERR("ustcomm_send failed.");
+ return NULL;
+ }
+ result = ustcomm_recv_fd(buf->app_sock, &header, NULL, &buf->pipe_fd);
+ if (result <= 0) {
+ ERR("ustcomm_recv_fd failed");
+ return NULL;
+ } else {
+ struct stat temp;
+ fstat(buf->pipe_fd, &temp);
+ if (!S_ISFIFO(temp.st_mode)) {
+ ERR("Didn't receive a fifo from the app");
+ return NULL;
+ }
+ }
if(instance->callbacks->on_open_buffer)
instance->callbacks->on_open_buffer(instance->callbacks, buf);
@@ -361,7 +380,7 @@ static void destroy_buffer(struct libustd_callbacks *callbacks,
{
int result;
- result = ustcomm_close_app(buf->conn);
+ result = close(buf->app_sock);
if(result == -1) {
WARN("problem calling ustcomm_close_app");
}
@@ -379,28 +398,31 @@ static void destroy_buffer(struct libustd_callbacks *callbacks,
if(callbacks->on_close_buffer)
callbacks->on_close_buffer(callbacks, buf);
- free(buf->conn);
free(buf);
}
int consumer_loop(struct libustd_instance *instance, struct buffer_info *buf)
{
- int result;
+ int result, read_result;
+ char read_buf;
pthread_cleanup_push(decrement_active_buffers, instance);
for(;;) {
+ read_result = read(buf->pipe_fd, &read_buf, 1);
/* get the subbuffer */
- result = get_subbuffer(buf);
- if(result == -1) {
- ERR("error getting subbuffer");
- continue;
- }
- else if(result == GET_SUBBUF_DONE) {
- /* this is done */
- break;
- }
- else if(result == GET_SUBBUF_DIED) {
+ if (read_result == 1) {
+ result = get_subbuffer(buf);
+ if(result == -1) {
+ ERR("error getting subbuffer");
+ continue;
+ } else if (result == GET_SUBBUF_DIED) {
+ finish_consuming_dead_subbuffer(instance->callbacks, buf);
+ break;
+ }
+ } else if ((read_result == -1 && (errno == ECONNRESET || errno == EPIPE)) ||
+ result == 0) {
+ DBG("App died while being traced");
finish_consuming_dead_subbuffer(instance->callbacks, buf);
break;
}
@@ -541,64 +563,86 @@ int start_consuming_buffer(
return 0;
}
+static void process_client_cmd(char *recvbuf, struct libustd_instance *instance)
+{
+ if(!strncmp(recvbuf, "collect", 7)) {
+ pid_t pid;
+ char *bufname;
+ int result;
+
+ result = sscanf(recvbuf, "%*s %d %50as", &pid, &bufname);
+ if (result != 2) {
+ ERR("parsing error: %s", recvbuf);
+ goto free_bufname;
+ }
+
+ result = start_consuming_buffer(instance, pid, bufname);
+ if (result < 0) {
+ ERR("error in add_buffer");
+ goto free_bufname;
+ }
+
+ free_bufname:
+ free(bufname);
+ } else if(!strncmp(recvbuf, "exit", 4)) {
+ /* Only there to force poll to return */
+ } else {
+ WARN("unknown command: %s", recvbuf);
+ }
+}
+
+#define MAX_EVENTS 10
int libustd_start_instance(struct libustd_instance *instance)
{
- int result;
- int timeout = -1;
+ struct ustcomm_sock *epoll_sock;
+ struct epoll_event events[MAX_EVENTS];
+ struct sockaddr addr;
+ int result, epoll_fd, accept_fd, nfds, i, addr_size, timeout;
if(!instance->is_init) {
ERR("libustd instance not initialized");
return 1;
}
+ epoll_fd = instance->epoll_fd;
+
+ timeout = -1;
/* app loop */
for(;;) {
- char *recvbuf;
-
- /* check for requests on our public socket */
- result = ustcomm_ustd_recv_message(instance->comm, &recvbuf, NULL, timeout);
- if(result == -1 && errno == EINTR) {
+ nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, timeout);
+ if (nfds == -1 && errno == EINTR) {
/* Caught signal */
+ } else if (nfds == -1) {
+ ERR("epoll_wait");
}
- else if(result == -1) {
- ERR("error in ustcomm_ustd_recv_message");
- goto loop_end;
- }
- else if(result > 0) {
- if(!strncmp(recvbuf, "collect", 7)) {
- pid_t pid;
- char *bufname;
- int result;
-
- result = sscanf(recvbuf, "%*s %d %50as", &pid, &bufname);
- if(result != 2) {
- ERR("parsing error: %s", recvbuf);
- goto free_bufname;
- }
- result = start_consuming_buffer(instance, pid, bufname);
- if(result < 0) {
- ERR("error in add_buffer");
- goto free_bufname;
+ for (i = 0; i < nfds; ++i) {
+ epoll_sock = (struct ustcomm_sock *)events[i].data.ptr;
+ if (epoll_sock == instance->listen_sock) {
+ addr_size = sizeof(struct sockaddr);
+ accept_fd = accept(epoll_sock->fd,
+ &addr,
+ (socklen_t *)&addr_size);
+ if (accept_fd == -1) {
+ ERR("accept failed\n");
+ }
+ ustcomm_init_sock(accept_fd, epoll_fd,
+ &instance->connections);
+ } else {
+ char *msg = NULL;
+ result = recv_message_conn(epoll_sock->fd, &msg);
+ if (result == 0) {
+ ustcomm_del_sock(epoll_sock, 0);
+ } else if (msg) {
+ process_client_cmd(msg, instance);
+ free(msg);
}
- free_bufname:
- free(bufname);
- }
- else if(!strncmp(recvbuf, "exit", 4)) {
- /* Only there to force poll to return */
- }
- else {
- WARN("unknown command: %s", recvbuf);
}
-
- free(recvbuf);
}
- loop_end:
-
- if(instance->quit_program) {
+ if (instance->quit_program) {
pthread_mutex_lock(&instance->mutex);
if(instance->active_buffers == 0) {
pthread_mutex_unlock(&instance->mutex);
@@ -617,14 +661,16 @@ int libustd_start_instance(struct libustd_instance *instance)
return 0;
}
+/* FIXME: threads and connections !? */
void libustd_delete_instance(struct libustd_instance *instance)
{
- if(instance->is_init)
- ustcomm_fini_ustd(instance->comm);
+ if (instance->is_init) {
+ ustcomm_del_named_sock(instance->listen_sock, 0);
+ close(instance->epoll_fd);
+ }
pthread_mutex_destroy(&instance->mutex);
free(instance->sock_path);
- free(instance->comm);
free(instance);
}
@@ -669,17 +715,13 @@ int libustd_stop_instance(struct libustd_instance *instance, int send_msg)
return 0;
}
-struct libustd_instance *libustd_new_instance(
- struct libustd_callbacks *callbacks, char *sock_path)
+struct libustd_instance
+*libustd_new_instance(struct libustd_callbacks *callbacks,
+ char *sock_path)
{
struct libustd_instance *instance =
zmalloc(sizeof(struct libustd_instance));
- if(!instance)
- return NULL;
-
- instance->comm = malloc(sizeof(struct ustcomm_ustd));
- if(!instance->comm) {
- free(instance);
+ if(!instance) {
return NULL;
}
@@ -689,18 +731,75 @@ struct libustd_instance *libustd_new_instance(
instance->active_buffers = 0;
pthread_mutex_init(&instance->mutex, NULL);
- if(sock_path)
+ if (sock_path) {
instance->sock_path = strdup(sock_path);
- else
+ } else {
instance->sock_path = NULL;
+ }
return instance;
}
+static int init_ustd_socket(struct libustd_instance *instance)
+{
+ char *name;
+
+ if (instance->sock_path) {
+ if (asprintf(&name, "%s", instance->sock_path) < 0) {
+ ERR("ustcomm_init_ustd : asprintf failed (sock_path %s)",
+ instance->sock_path);
+ return -1;
+ }
+ } else {
+ int result;
+
+ /* Only check if socket dir exists if we are using the default directory */
+ result = ensure_dir_exists(SOCK_DIR);
+ if (result == -1) {
+ ERR("Unable to create socket directory %s", SOCK_DIR);
+ return -1;
+ }
+
+ if (asprintf(&name, "%s/%s", SOCK_DIR, "ustd") < 0) {
+ ERR("ustcomm_init_ustd : asprintf failed (%s/ustd)",
+ SOCK_DIR);
+ return -1;
+ }
+ }
+
+ /* Set up epoll */
+ instance->epoll_fd = epoll_create(MAX_EVENTS);
+ if (instance->epoll_fd == -1) {
+ ERR("epoll_create failed, start instance bailing");
+ goto free_name;
+ }
+
+ /* Create the named socket */
+ instance->listen_sock = ustcomm_init_named_socket(name,
+ instance->epoll_fd);
+ if(!instance->listen_sock) {
+ ERR("error initializing named socket at %s", name);
+ goto close_epoll;
+ }
+
+ INIT_LIST_HEAD(&instance->connections);
+
+ free(name);
+
+ return 0;
+
+close_epoll:
+ close(instance->epoll_fd);
+free_name:
+ free(name);
+
+ return -1;
+}
+
int libustd_init_instance(struct libustd_instance *instance)
{
int result;
- result = ustcomm_init_ustd(instance->comm, instance->sock_path);
+ result = init_ustd_socket(instance);
if(result == -1) {
ERR("failed to initialize socket");
return 1;
--
1.7.1
More information about the lttng-dev
mailing list