[ltt-dev] [UST PATCH] RFC - introduce a simple protocol format for ustcomm

Nils Carlson nils.carlson at ericsson.com
Wed Sep 15 09:56:45 EDT 2010


This is an embryo of what I'm currently working on.
Currently it adds a header specifying message size to
ustcomm communication, allowing correct allocation from
the beginning.

In the next step my plan is to start moving commands into
enums and clean up ustcomms handling to use epoll.

Signed-off-by: Nils Carlson <nils.carlson at ericsson.com>
---
 libustcomm/ustcomm.c      |  243 ++++++++++++++++++++++++---------------------
 libustcomm/ustcomm.h      |    7 ++
 tests/test-nevents/prog.c |    1 +
 3 files changed, 138 insertions(+), 113 deletions(-)

diff --git a/libustcomm/ustcomm.c b/libustcomm/ustcomm.c
index 567c5d1..3c8ff39 100644
--- a/libustcomm/ustcomm.c
+++ b/libustcomm/ustcomm.c
@@ -107,36 +107,6 @@ int pid_is_online(pid_t pid) {
 	return 1;
 }
 
-/* Send a message
- *
- * @fd: file descriptor to send to
- * @msg: a null-terminated string containing the message to send
- *
- * Return value:
- * -1: error
- * 0: connection closed
- * 1: success
- */
-
-static int send_message_fd(int fd, const char *msg)
-{
-	int result;
-
-	/* Send including the final \0 */
-	result = patient_send(fd, msg, strlen(msg)+1, MSG_NOSIGNAL);
-	if(result == -1) {
-		if(errno != EPIPE)
-			PERROR("send");
-		return -1;
-	}
-	else if(result == 0) {
-		return 0;
-	}
-
-	DBG("sent message \"%s\"", msg);
-	return 1;
-}
-
 /* Called by an app to ask the consumer daemon to connect to it. */
 
 int ustcomm_request_consumer(pid_t pid, const char *channel)
@@ -200,101 +170,97 @@ int ustcomm_request_consumer(pid_t pid, const char *channel)
 #define RECV_INCREMENT 1000
 #define RECV_INITIAL_BUF_SIZE 10
 
-static int recv_message_fd(int fd, char **recv_buf, int *recv_buf_size, int *recv_buf_alloc, char **msg)
+static int _recv_message_conn(struct ustcomm_connection *conn,
+			      struct ustcomm_header *req_header,
+			      char **data)
 {
 	int result;
+	int retval;
 
-	/* 1. Check if there is a message in the buf */
-	/* 2. If not, do:
-           2.1 receive chunk and put it in buffer
-	   2.2 process full message if there is one
-	   -- while no message arrived
-	*/
-
-	for(;;) {
-		int i;
-		int nulfound = 0;
-
-		/* Search for full message in buffer */
-		for(i=0; i<*recv_buf_size; i++) {
-			if((*recv_buf)[i] == '\0') {
-				nulfound = 1;
-				break;
-			}
-		}
-
-		/* Process found message */
-		if(nulfound == 1) {
-			char *newbuf;
-
-			if(i == 0) {
-				/* problem */
-				WARN("received empty message");
-			}
-			*msg = strndup(*recv_buf, i);
-
-			/* Remove processed message from buffer */
-			newbuf = (char *) malloc(*recv_buf_size - (i+1));
-			memcpy(newbuf, *recv_buf + (i+1), *recv_buf_size - (i+1));
-			free(*recv_buf);
-			*recv_buf = newbuf;
-			*recv_buf_size -= (i+1);
-			*recv_buf_alloc -= (i+1);
-
-			return 1;
-		}
-
-		/* Receive a chunk from the fd */
-		if(*recv_buf_alloc - *recv_buf_size < RECV_INCREMENT) {
-			*recv_buf_alloc += RECV_INCREMENT - (*recv_buf_alloc - *recv_buf_size);
-			*recv_buf = (char *) realloc(*recv_buf, *recv_buf_alloc);
+	result = recv(conn->fd, req_header, sizeof(struct ustcomm_header),
+		      MSG_NOSIGNAL);
+	if (result <= 0) {
+		if(errno == ECONNRESET) {
+			return 0;
+		} else if (errno == EINTR) {
+			return -1;
+		} else if (result < 0) {
+			PERROR("recv");
+			return -1;
 		}
+		return 0;
+	}
 
-		result = recv(fd, *recv_buf+*recv_buf_size, RECV_INCREMENT, 0);
-		if(result == -1) {
+	if (req_header->size > 0) {
+		*data = malloc(req_header->size);
+		result = recv(conn->fd, *data, req_header->size,
+			      MSG_NOSIGNAL);
+		if (result <= 0) {
 			if(errno == ECONNRESET) {
-				*recv_buf_size = 0;
-				return 0;
-			}
-			else if(errno == EINTR) {
-				return -1;
-			}
-			else {
+				retval = 0;
+			} else if (errno == EINTR) {
+				retval = -1;
+			} else if (result < 0) {
 				PERROR("recv");
-				return -1;
+				retval = -1;
+			} else {
+				retval = 0;
 			}
+			free(*data);
+			return retval;
 		}
-		if(result == 0) {
-			return 0;
-		}
-		*recv_buf_size += result;
-
-		/* Go back to the beginning to check if there is a full message in the buffer */
 	}
+}
 
-	DBG("received message \"%s\"", *recv_buf);
-
-	return 1;
+static int recv_message_conn(struct ustcomm_connection *conn, char **msg)
+{
+	struct ustcomm_header req_header;
 
+	return _recv_message_conn(conn, &req_header, msg);
 }
 
-static int recv_message_conn(struct ustcomm_connection *conn, char **msg)
+int ustcomm_send_res(struct ustcomm_server *server,
+		     const struct ustcomm_header *res_header,
+		     const char *data,
+		     struct ustcomm_source *src)
 {
-	return recv_message_fd(conn->fd, &conn->recv_buf, &conn->recv_buf_size, &conn->recv_buf_alloc, msg);
+	int result;
+
+	result = patient_send(src->fd, (char *)res_header,
+			      sizeof(struct ustcomm_header), MSG_NOSIGNAL);
+	if (result <= 0) {
+		if (errno != EPIPE) {
+			ERR("send");
+		}
+		return -1;
+	}
+
+	if (res_header->size > 0) {
+		result = patient_send(src->fd, data, res_header->size, MSG_NOSIGNAL);
+		if (result <= 0) {
+			if(errno != EPIPE) {
+				ERR("send");
+			}
+			return -1;
+		}
+	}
 }
 
 int ustcomm_send_reply(struct ustcomm_server *server, char *msg, struct ustcomm_source *src)
 {
 	int result;
+	struct ustcomm_header res_header;
+
+	res_header.size = strlen(msg) + 1;
 
-	result = send_message_fd(src->fd, msg);
+	result = ustcomm_send_res(server, &res_header, msg, src);
 	if(result < 0) {
-		ERR("error in send_message_fd");
+		ERR("error in ustcomm_send_res");
 		return -1;
 	}
 
 	return 0;
-} 
+}
 
 /* Called after a fork. */
 
@@ -517,6 +483,61 @@ static int init_named_socket(const char *name, char **path_out)
 	return -1;
 }
 
+int ustcomm_send_req(struct ustcomm_connection *conn,
+		     const struct ustcomm_header *req_header,
+		     const char *data,
+		     char **response)
+{
+	int result;
+	struct ustcomm_header res_header;
+
+	result = patient_send(conn->fd, (char *)req_header,
+			      sizeof(struct ustcomm_header), MSG_NOSIGNAL);
+	if (result <= 0) {
+		if (errno != EPIPE) {
+			ERR("sending header");
+		}
+		return -1;
+	}
+	if (req_header->size > 0) {
+		result = patient_send(conn->fd, data, req_header->size, MSG_NOSIGNAL);
+		if (result <= 0) {
+			if(errno != EPIPE) {
+				ERR("sending data");
+			}
+			return -1;
+		}
+	}
+
+	if (!response) {
+		return 0;
+	}
+
+	result = recv(conn->fd, &res_header, sizeof(struct ustcomm_header),
+		      MSG_NOSIGNAL);
+	if (result <= 0) {
+		if(errno != EPIPE) {
+			ERR("receiving header");
+		}
+		return -1;
+	}
+
+	if (res_header.size > 0) {
+		*response = malloc(res_header.size);
+		result = recv(conn->fd, *response, res_header.size,
+			      MSG_NOSIGNAL);
+		if (result <= 0) {
+			if(errno != EPIPE) {
+				ERR("receiving data");
+			}
+			free(*response);
+			return -1;
+		}
+	}
+
+	return 0;
+}
+
 /*
  * Return value:
  *   0: Success, but no reply because recv() returned 0
@@ -531,23 +552,19 @@ int ustcomm_send_request(struct ustcomm_connection *conn, const char *req, char
 {
 	int result;
 
-	/* Send including the final \0 */
-	result = send_message_fd(conn->fd, req);
-	if(result != 1)
-		return result;
+	struct ustcomm_header req_header;
 
-	if(!reply)
-		return 1;
+	req_header.size = strlen(req) + 1;
 
-	result = recv_message_conn(conn, reply);
-	if(result == -1) {
-		return -1;
-	}
-	else if(result == 0) {
-		return 0;
+	result = ustcomm_send_req(conn,
+				  &req_header,
+				  req,
+				  reply);
+	if (result == 0) {
+		return 1;
+	} else {
+		return result;
 	}
-	
-	return 1;
 }
 
 /* Return value:
diff --git a/libustcomm/ustcomm.h b/libustcomm/ustcomm.h
index f96ca16..724a9c0 100644
--- a/libustcomm/ustcomm.h
+++ b/libustcomm/ustcomm.h
@@ -28,6 +28,13 @@
 #define SOCK_DIR "/tmp/ust-app-socks"
 #define UST_SIGNAL SIGIO
 
+struct ustcomm_header {
+	int type;
+	long size;
+	int command;
+	int response;
+};
+
 struct ustcomm_connection {
 	struct list_head list;
 	int fd;
diff --git a/tests/test-nevents/prog.c b/tests/test-nevents/prog.c
index b2350cc..18fda0e 100644
--- a/tests/test-nevents/prog.c
+++ b/tests/test-nevents/prog.c
@@ -30,6 +30,7 @@ int main()
 	int i;
 
 	for(i=0; i<N_ITER; i++) {
+		sleep(10);
 		trace_mark(ust, an_event, "%d", i);
 		trace_mark(ust, another_event, "%s", "Hello, World!");
 	}
-- 
1.7.1





More information about the lttng-dev mailing list