/* Generic Unix-domain Socket I/O Copyright (C) Amitay Isaacs 2016 This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, see . */ #include "replace.h" #include "system/filesys.h" #include "system/network.h" #include #include #include "lib/util/sys_rw.h" #include "lib/util/debug.h" #include "lib/util/blocking.h" #include "common/logging.h" #include "common/sock_io.h" bool sock_clean(const char *sockpath) { int ret; ret = unlink(sockpath); if (ret == 0) { D_WARNING("Removed stale socket %s\n", sockpath); } else if (errno != ENOENT) { D_ERR("Failed to remove stale socket %s\n", sockpath); return false; } return true; } int sock_connect(const char *sockpath) { struct sockaddr_un addr; size_t len; int fd, ret; if (sockpath == NULL) { D_ERR("Invalid socket path\n"); return -1; } memset(&addr, 0, sizeof(addr)); addr.sun_family = AF_UNIX; len = strlcpy(addr.sun_path, sockpath, sizeof(addr.sun_path)); if (len >= sizeof(addr.sun_path)) { D_ERR("Socket path too long, len=%zu\n", strlen(sockpath)); return -1; } fd = socket(AF_UNIX, SOCK_STREAM, 0); if (fd == -1) { D_ERR("socket() failed, errno=%d\n", errno); return -1; } ret = connect(fd, (struct sockaddr *)&addr, sizeof(addr)); if (ret == -1) { D_ERR("connect() failed, errno=%d\n", errno); close(fd); return -1; } return fd; } struct sock_queue { struct tevent_context *ev; sock_queue_callback_fn_t callback; void *private_data; int fd; struct tevent_immediate *im; struct tevent_queue *queue; struct tevent_fd *fde; uint8_t *buf; size_t buflen, begin, end; }; /* * The reserved talloc headers, SOCK_QUEUE_OBJ_COUNT, * and the pre-allocated pool-memory SOCK_QUEUE_POOL_SIZE, * are used for the sub-objects queue->im, queue->queue, queue->fde * and queue->buf. * If the memory allocating sub-objects of struct sock_queue change, * those values need to be adjusted. */ #define SOCK_QUEUE_OBJ_COUNT 4 #define SOCK_QUEUE_POOL_SIZE 2048 static bool sock_queue_set_fd(struct sock_queue *queue, int fd); static void sock_queue_handler(struct tevent_context *ev, struct tevent_fd *fde, uint16_t flags, void *private_data); static void sock_queue_process(struct sock_queue *queue); static void sock_queue_process_event(struct tevent_context *ev, struct tevent_immediate *im, void *private_data); struct sock_queue *sock_queue_setup(TALLOC_CTX *mem_ctx, struct tevent_context *ev, int fd, sock_queue_callback_fn_t callback, void *private_data) { struct sock_queue *queue; queue = talloc_pooled_object(mem_ctx, struct sock_queue, SOCK_QUEUE_OBJ_COUNT, SOCK_QUEUE_POOL_SIZE); if (queue == NULL) { return NULL; } memset(queue, 0, sizeof(struct sock_queue)); queue->ev = ev; queue->callback = callback; queue->private_data = private_data; queue->im = tevent_create_immediate(queue); if (queue->im == NULL) { talloc_free(queue); return NULL; } queue->queue = tevent_queue_create(queue, "out-queue"); if (queue->queue == NULL) { talloc_free(queue); return NULL; } if (! sock_queue_set_fd(queue, fd)) { talloc_free(queue); return NULL; } return queue; } static bool sock_queue_set_fd(struct sock_queue *queue, int fd) { TALLOC_FREE(queue->fde); queue->fd = fd; if (fd != -1) { int ret; ret = set_blocking(fd, false); if (ret != 0) { return false; } queue->fde = tevent_add_fd(queue->ev, queue, fd, TEVENT_FD_READ, sock_queue_handler, queue); if (queue->fde == NULL) { return false; } tevent_fd_set_auto_close(queue->fde); } return true; } static void sock_queue_handler(struct tevent_context *ev, struct tevent_fd *fde, uint16_t flags, void *private_data) { struct sock_queue *queue = talloc_get_type_abort( private_data, struct sock_queue); int ret, num_ready; ssize_t nread; ret = ioctl(queue->fd, FIONREAD, &num_ready); if (ret != 0) { /* Ignore */ return; } if (num_ready == 0) { /* descriptor has been closed */ goto fail; } if ((size_t)num_ready > queue->buflen - queue->end) { queue->buf = talloc_realloc_size(queue, queue->buf, queue->end + num_ready); if (queue->buf == NULL) { goto fail; } queue->buflen = queue->end + num_ready; } nread = sys_read(queue->fd, queue->buf + queue->end, num_ready); if (nread < 0) { goto fail; } queue->end += nread; sock_queue_process(queue); return; fail: queue->callback(NULL, 0, queue->private_data); } static void sock_queue_process(struct sock_queue *queue) { uint32_t pkt_size; if ((queue->end - queue->begin) < sizeof(uint32_t)) { /* not enough data */ return; } pkt_size = *(uint32_t *)(queue->buf + queue->begin); if (pkt_size == 0) { D_ERR("Invalid packet of length 0\n"); queue->callback(NULL, 0, queue->private_data); return; } if ((queue->end - queue->begin) < pkt_size) { /* not enough data */ return; } queue->callback(queue->buf + queue->begin, pkt_size, queue->private_data); queue->begin += pkt_size; if (queue->begin < queue->end) { /* more data to be processed */ tevent_schedule_immediate(queue->im, queue->ev, sock_queue_process_event, queue); } else { TALLOC_FREE(queue->buf); queue->buflen = 0; queue->begin = 0; queue->end = 0; } } static void sock_queue_process_event(struct tevent_context *ev, struct tevent_immediate *im, void *private_data) { struct sock_queue *queue = talloc_get_type_abort( private_data, struct sock_queue); sock_queue_process(queue); } struct sock_queue_write_state { uint8_t *pkt; uint32_t pkt_size; }; static void sock_queue_trigger(struct tevent_req *req, void *private_data); int sock_queue_write(struct sock_queue *queue, uint8_t *buf, size_t buflen) { struct tevent_req *req; struct sock_queue_write_state *state; struct tevent_queue_entry *qentry; if (buflen >= INT32_MAX) { return -1; } req = tevent_req_create(queue, &state, struct sock_queue_write_state); if (req == NULL) { return -1; } state->pkt = buf; state->pkt_size = (uint32_t)buflen; qentry = tevent_queue_add_entry(queue->queue, queue->ev, req, sock_queue_trigger, queue); if (qentry == NULL) { talloc_free(req); return -1; } return 0; } static void sock_queue_trigger(struct tevent_req *req, void *private_data) { struct sock_queue *queue = talloc_get_type_abort( private_data, struct sock_queue); struct sock_queue_write_state *state = tevent_req_data( req, struct sock_queue_write_state); size_t offset = 0; do { ssize_t nwritten; nwritten = sys_write(queue->fd, state->pkt + offset, state->pkt_size - offset); if (nwritten < 0) { queue->callback(NULL, 0, queue->private_data); return; } offset += nwritten; } while (offset < state->pkt_size); tevent_req_done(req); talloc_free(req); }