[ltt-dev] [UST PATCH] Re-write ustcomm parts of UST

Nils Carlson nils.carlson at ericsson.com
Mon Sep 27 04:49:33 EDT 2010


Thanks for the comments! Great for a tired coder...
If I don't comment your comment I agree with it. :-)

On Fri, 24 Sep 2010, David Goulet wrote:

> Comments below.
>
> On 10-09-23 06:13 AM, Nils Carlson wrote:
>> +
>> +
>> +
>> +#define MAX_EVENTS 10
>
> Why 10 ? If I understand correctly, for 1 traced app, 1 ustcomm_sock so
> over 10 apps, it will refuse ?

No, this argument is in fact ignored by modern epoll implementations (for 
the epoll_create call according to man-pages), so in practice it just 
tells epoll_wait how many events to return. If more events are generated 
epoll_wait will just return immediately and the the loop will start over. 
10 is just a wild guess pretty much. :-)

>> +
>> +
>> +
>>   void *listener_main(void *p)
>>   {
>> -     int result;
>> +     struct ustcomm_sock *epoll_sock;
>> +     struct epoll_event events[MAX_EVENTS];
>> +     struct sockaddr addr;
>> +     int accept_fd, nfds, result, i, addr_size;
>>
>>       DBG("LISTENER");
>>
>>       pthread_cleanup_push(listener_cleanup, NULL);
>>
>> -     for (;;) {
>> -             struct mpentries mpent;
>> -
>> -             multipoll_init(&mpent);
>> -
>> -             blocked_consumers_add_to_mp(&mpent);
>> -             ustcomm_mp_add_app_clients(&mpent,&ustcomm_app, process_client_cmd);
>> -
>> -             result = multipoll_poll(&mpent, -1);
>> -             if (result == -1) {
>> -                     ERR("error in multipoll_poll");
>> +     for(;;) {
>> +             nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
>> +             if (nfds == -1) {
>> +                     ERR("epoll_wait");
>> +                     continue;
>
> Until the end of time?

Jepp, we shouldn't even end up here to be honest. Though maybe that should 
be an indication that at that point we bail? Difficult to handle these 
cases... We have blocked all signals, so we shouldn't be receiving any.

>>               }
>>
>> -             multipoll_destroy(&mpent);
>> +             for (i = 0; i<  nfds; i++) {
>> +                     epoll_sock = (struct ustcomm_sock *)events[i].data.ptr;
>> +                     if (epoll_sock == listen_sock) {
>> +                             addr_size = sizeof(struct sockaddr);
>> +                             accept_fd = accept(epoll_sock->fd,
>> +                                             &addr,
>> +                                                (socklen_t *)&addr_size);
>> +                             if (accept_fd == -1) {
>> +                                     ERR("accept failed\n");
>
> I surely will get out of there if the accept_fd is not good and not
> initiate communication with the next line.

Great! Really missed this.
>
>> +                             }
>> +                             ustcomm_init_sock(accept_fd, epoll_fd,
>> +                                             &ust_socks);
>> +                     } else {
>> +                             char *msg = NULL;
>> +                             result = recv_message_conn(epoll_sock->fd,&msg);
>> +                             if (result == 0) {
>> +                                     ustcomm_del_sock(epoll_sock, 0);
>> +                             } else if (msg) {
>> +                                     process_client_cmd(msg, epoll_sock->fd);
>> +                                     free(msg);
>> +                             }
>> +                     }
>> +             }
>>       }
>>

....

>> +void ustcomm_del_named_sock(struct ustcomm_sock *sock,
>> +                         int keep_socket_file)
>
> Maybe should this function return some error code because you have case
> like the malloc of sockaddr that if fails, you just return; but the sock
> is actually still there...

Maybe, but at that point, what can you do? It's like a free call, you're 
not going to use it anymore. What I think I'll do though is to make it 
goto the freeing of the struct  so you still remove it from the list and 
free the memory used by the ustcomm_sock.


>>   {
>> -     int result;
>> +     int result, fd;
>> +     struct stat st;
>> +     struct sockaddr dummy;
>> +     struct sockaddr_un *sockaddr = NULL;
>> +     int alloc_size;
>>
>> -     /* 1. Check if there is a message in the buf */
>> -     /* 2. If not, do:
>> -           2.1 receive chunk and put it in buffer
>> -        2.2 process full message if there is one
>> -        -- while no message arrived
>> -     */
>> +     fd = sock->fd;
>>
>> -     for(;;) {
>> -             int i;
>> -             int nulfound = 0;
>> +     if(!keep_socket_file) {
>>
>> -             /* Search for full message in buffer */
>> -             for(i=0; i<*recv_buf_size; i++) {
>> -                     if((*recv_buf)[i] == '\0') {
>> -                             nulfound = 1;
>> -                             break;
>> -                     }
>> +             /* Get the socket name */
>> +             alloc_size = sizeof(dummy);
>> +             if (getsockname(fd,&dummy, (socklen_t *)&alloc_size)<  0) {
>> +                     PERROR("getsockname failed");
>> +                     return;
>>               }
>>
>> -             /* Process found message */
>> -             if(nulfound == 1) {
>> -                     char *newbuf;
>> -
>> -                     if(i == 0) {
>> -                             /* problem */
>> -                             WARN("received empty message");
>> -                     }
>> -                     *msg = strndup(*recv_buf, i);
>> -
>> -                     /* Remove processed message from buffer */
>> -                     newbuf = (char *) malloc(*recv_buf_size - (i+1));
>> -                     memcpy(newbuf, *recv_buf + (i+1), *recv_buf_size - (i+1));
>> -                     free(*recv_buf);
>> -                     *recv_buf = newbuf;
>> -                     *recv_buf_size -= (i+1);
>> -                     *recv_buf_alloc -= (i+1);
>> -
>> -                     return 1;
>> +             sockaddr = malloc(alloc_size);
>
> zmalloc
>
>> +             if (!sockaddr) {
>> +                     ERR("failed to allocate sockaddr");
>> +                     return;
>>               }
>>
>> -             /* Receive a chunk from the fd */
>> -             if(*recv_buf_alloc - *recv_buf_size<  RECV_INCREMENT) {
>> -                     *recv_buf_alloc += RECV_INCREMENT - (*recv_buf_alloc - *recv_buf_size);
>> -                     *recv_buf = (char *) realloc(*recv_buf, *recv_buf_alloc);
>> +             if (getsockname(fd, sockaddr, (socklen_t *)&alloc_size)<  0) {
>> +                     PERROR("getsockname failed");
>> +                     goto free_sockaddr;
>>               }
>>
>> -             result = recv(fd, *recv_buf+*recv_buf_size, RECV_INCREMENT, 0);
>> +             /* Destroy socket */
>> +             result = stat(sockaddr->sun_path,&st);
>>               if(result == -1) {
>> -                     if(errno == ECONNRESET) {
>> -                             *recv_buf_size = 0;
>> -                             return 0;
>> -                     }
>> -                     else if(errno == EINTR) {
>> -                             return -1;
>> -                     }
>> -                     else {
>> -                             PERROR("recv");
>> -                             return -1;
>> -                     }
>> +                     PERROR("stat (%s)", sockaddr->sun_path);
>> +                     goto free_sockaddr;
>>               }
>> -             if(result == 0) {
>> -                     return 0;
>> +
>> +             /* Paranoid check before deleting. */
>> +             result = S_ISSOCK(st.st_mode);
>> +             if(!result) {
>> +                     ERR("The socket we are about to delete is not a socket.");
>> +                     goto free_sockaddr;
>>               }
>> -             *recv_buf_size += result;
>>
>> -             /* Go back to the beginning to check if there is a full message in the buffer */
>> +             result = unlink(sockaddr->sun_path);
>> +             if(result == -1) {
>> +                     PERROR("unlink");
>> +             }
>>       }
>>
>> -     DBG("received message \"%s\"", *recv_buf);
>> +     ustcomm_del_sock(sock, keep_socket_file);
>>
>> -     return 1;
>> +free_sockaddr:
>> +     free(sockaddr);
>>
>>   }

...

> Valgrind fails here. Do you know if this is a bug of glibc or valgrind ?
>

This is something with valgrind, google investigated it and couldn't find 
the problem. Possibly it's glibc as well, who knows.

>> +     result = sendmsg(sock,&msg, MSG_NOSIGNAL);
>> +     if (result<  0&&  errno != EPIPE) {
>> +             PERROR("sendmsg failed");
>> +     }
>> +     return result;
>>   }
>>

...

>> -             *path_out = strdup(addr.sun_path);
>> +     result = ustcomm_send(sock, req_header, data);
>> +     if ( result<= 0) {
>> +             return result;
>>       }
>>
>> -     return fd;
>> +     if (!response) {
>> +             return 1;
>> +     }
>>
>> -     close_sock:
>> -     close(fd);
>> +     return ustcomm_recv(sock,
>> +                     &res_header,
>> +                         response);
>
> Maybe oneliner here?

No, it's weirder than it looks. Basically, some messages don't get 
responses, in this case the response parameter was NULL. I'm going to 
remove this and force responses to all messages in one of the coming 
patches. So nothing to worry about. :-)

>>
>> -     return -1;
>>   }

...

>> +                 buf->name);
>> +             return NULL;
>> +     }
>> +     header.size = strlen(send_msg) + 1;
>> +     result = ustcomm_send(buf->app_sock,&header, send_msg);
>> +     free(send_msg);
>> +     if (result<= 0) {
>
> "<=" ?? Is this right?
>

Yepp, negative means problem receiving, 0 means socket closed, these are 
sending up IO errors.

I wouldn't worry too much about this right now, a lot of libustd will have 
to be re-written as I said so this will be taken care of.

>> +             ERR("ustcomm_send failed.");
>> +             return NULL;
>> +     }
>> +     result = ustcomm_recv_fd(buf->app_sock,&header, NULL,&buf->pipe_fd);
>> +     if (result<= 0) {
>
> Same here
>

Same here.

....

>> +static void process_client_cmd(char *recvbuf, struct libustd_instance *instance)
>> +{
>> +     if(!strncmp(recvbuf, "collect", 7)) {
>> +             pid_t pid;
>> +             char *bufname;
>> +             int result;
>> +
>> +             result = sscanf(recvbuf, "%*s %d %50as",&pid,&bufname);
>> +             if (result != 2) {
>> +                     ERR("parsing error: %s", recvbuf);
>> +                     goto free_bufname;
>
> Do we have the assurance that bufname has been allocated here when going
> to free_bufname ?

No, actually we don't. I think we need to do an explicit NULL check here, 
I'll add it. Anybody else have any opinions? This is one of the reasons I 
want to get rid of all these pesky string allocations.

>> +             }
>> +
>> +             result = start_consuming_buffer(instance, pid, bufname);
>> +             if (result<  0) {
>> +                     ERR("error in add_buffer");
>> +                     goto free_bufname;
>> +             }
>> +
>> +     free_bufname:
>> +             free(bufname);
>> +     } else if(!strncmp(recvbuf, "exit", 4)) {
>> +             /* Only there to force poll to return */
>> +     } else {
>> +             WARN("unknown command: %s", recvbuf);
>> +     }
>> +}
>> +
>> +#define MAX_EVENTS 10
>
> Wasn't previously defined... maybe define this globally ?

Actually, this one I'm considering increasing. :-) Ustd could be seeing a 
lot more traffic with lots of applications being traced. They should 
probably be independent.

>>
>>   int libustd_start_instance(struct libustd_instance *instance)
>>   {
>> -     int result;
>> -     int timeout = -1;
>> +     struct ustcomm_sock *epoll_sock;
>> +     struct epoll_event events[MAX_EVENTS];
>> +     struct sockaddr addr;
>> +     int result, epoll_fd, accept_fd, nfds, i, addr_size, timeout;
>>
>>       if(!instance->is_init) {
>>               ERR("libustd instance not initialized");
>>               return 1;
>>       }
>> +     epoll_fd = instance->epoll_fd;
>> +
>> +     timeout = -1;
>>
>>       /* app loop */
>>       for(;;) {
>> -             char *recvbuf;
>> -
>> -             /* check for requests on our public socket */
>> -             result = ustcomm_ustd_recv_message(instance->comm,&recvbuf, NULL, timeout);
>> -             if(result == -1&&  errno == EINTR) {
>> +             nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, timeout);
>> +             if (nfds == -1&&  errno == EINTR) {
>>                       /* Caught signal */
>> +             } else if (nfds == -1) {
>> +                     ERR("epoll_wait");
>
> Should there be a continue here ?

yes!

>>               }
>> -             else if(result == -1) {
>> -                     ERR("error in ustcomm_ustd_recv_message");
>> -                     goto loop_end;
>> -             }
>> -             else if(result>  0) {
>> -                     if(!strncmp(recvbuf, "collect", 7)) {
>> -                             pid_t pid;
>> -                             char *bufname;
>> -                             int result;
>> -
>> -                             result = sscanf(recvbuf, "%*s %d %50as",&pid,&bufname);
>> -                             if(result != 2) {
>> -                                     ERR("parsing error: %s", recvbuf);
>> -                                     goto free_bufname;
>> -                             }
>>
>> -                             result = start_consuming_buffer(instance, pid, bufname);
>> -                             if(result<  0) {
>> -                                     ERR("error in add_buffer");
>> -                                     goto free_bufname;
>> +             for (i = 0; i<  nfds; ++i) {
>> +                     epoll_sock = (struct ustcomm_sock *)events[i].data.ptr;
>> +                     if (epoll_sock == instance->listen_sock) {
>> +                             addr_size = sizeof(struct sockaddr);
>> +                             accept_fd = accept(epoll_sock->fd,
>> +                                             &addr,
>> +                                                (socklen_t *)&addr_size);
>> +                             if (accept_fd == -1) {
>> +                                     ERR("accept failed\n");
>
> Same as before, continuing event if accept_fd is bad ?
>

fixed.

>> +                             }
>> +                             ustcomm_init_sock(accept_fd, epoll_fd,
>> +                                             &instance->connections);
>> +                     } else {
>> +                             char *msg = NULL;
>> +                             result = recv_message_conn(epoll_sock->fd,&msg);
>> +                             if (result == 0) {
>> +                                     ustcomm_del_sock(epoll_sock, 0);
>> +                             } else if (msg) {
>> +                                     process_client_cmd(msg, instance);
>> +                                     free(msg);
>>                               }
>>
>> -                             free_bufname:
>> -                             free(bufname);
>> -                     }
>> -                     else if(!strncmp(recvbuf, "exit", 4)) {
>> -                             /* Only there to force poll to return */
>> -                     }
>> -                     else {
>> -                             WARN("unknown command: %s", recvbuf);
>>                       }
>> -
>> -                     free(recvbuf);
>>               }
>>
>> -             loop_end:
>> -
>> -             if(instance->quit_program) {
>> +             if (instance->quit_program) {
>>                       pthread_mutex_lock(&instance->mutex);
>>                       if(instance->active_buffers == 0) {
>>                               pthread_mutex_unlock(&instance->mutex);
>> @@ -617,14 +661,16 @@ int libustd_start_instance(struct libustd_instance *instance)
>>       return 0;
>>   }
>>
>> +/* FIXME: threads and connections !? */
>>   void libustd_delete_instance(struct libustd_instance *instance)
>>   {
>> -     if(instance->is_init)
>> -             ustcomm_fini_ustd(instance->comm);
>> +     if (instance->is_init) {
>> +             ustcomm_del_named_sock(instance->listen_sock, 0);
>> +             close(instance->epoll_fd);
>> +     }
>>
>>       pthread_mutex_destroy(&instance->mutex);
>>       free(instance->sock_path);
>> -     free(instance->comm);
>>       free(instance);
>>   }
>>
>> @@ -669,17 +715,13 @@ int libustd_stop_instance(struct libustd_instance *instance, int send_msg)
>>       return 0;
>>   }
>>
>> -struct libustd_instance *libustd_new_instance(
>> -     struct libustd_callbacks *callbacks, char *sock_path)
>> +struct libustd_instance
>> +*libustd_new_instance(struct libustd_callbacks *callbacks,
>> +                   char *sock_path)
>>   {
>>       struct libustd_instance *instance =
>>               zmalloc(sizeof(struct libustd_instance));
>> -     if(!instance)
>> -             return NULL;
>> -
>> -     instance->comm = malloc(sizeof(struct ustcomm_ustd));
>> -     if(!instance->comm) {
>> -             free(instance);
>> +     if(!instance) {
>
> if (!instance) ==> just syntax...

These are for another time...

> OUfff... LONG patch ;) Over for now. I think I did what I could ;)


Yepp.

/Nils





More information about the lttng-dev mailing list