diff options
author | Amitay Isaacs <amitay@gmail.com> | 2016-09-16 16:13:18 +1000 |
---|---|---|
committer | Amitay Isaacs <amitay@samba.org> | 2016-12-18 14:23:22 +0100 |
commit | 75a25d13310c53328f2b99f856abc8c5d023bc52 (patch) | |
tree | 084da9f1ddd6ac94abfa1bc947e8b85f4b5bdfd7 | |
parent | f0ba41e1c5a9b4000b918211fc16c3545d2133ab (diff) | |
download | samba-75a25d13310c53328f2b99f856abc8c5d023bc52.tar.gz |
ctdb-common: Add generic socket I/O
This is a generic socket read/write to be used in the ctdb daemon.
It is based on ctdb_io.c and comm.c.
Signed-off-by: Amitay Isaacs <amitay@gmail.com>
Reviewed-by: Martin Schwenke <martin@meltin.net>
-rw-r--r-- | ctdb/common/sock_io.c | 309 | ||||
-rw-r--r-- | ctdb/common/sock_io.h | 38 | ||||
-rw-r--r-- | ctdb/wscript | 2 |
3 files changed, 348 insertions, 1 deletions
diff --git a/ctdb/common/sock_io.c b/ctdb/common/sock_io.c new file mode 100644 index 00000000000..7245d4e973d --- /dev/null +++ b/ctdb/common/sock_io.c @@ -0,0 +1,309 @@ +/* + Generic Unix-domain Socket I/O + + Copyright (C) Amitay Isaacs 2016 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, see <http://www.gnu.org/licenses/>. +*/ + +#include "replace.h" +#include "system/filesys.h" +#include "system/network.h" + +#include <talloc.h> +#include <tevent.h> + +#include "lib/util/sys_rw.h" +#include "lib/util/debug.h" +#include "lib/util/blocking.h" + +#include "common/logging.h" +#include "common/sock_io.h" + +int sock_connect(const char *sockpath) +{ + struct sockaddr_un addr; + size_t len; + int fd, ret; + + if (sockpath == NULL) { + D_ERR("Invalid socket path\n"); + return -1; + } + + memset(&addr, 0, sizeof(addr)); + addr.sun_family = AF_UNIX; + len = strlcpy(addr.sun_path, sockpath, sizeof(addr.sun_path)); + if (len >= sizeof(addr.sun_path)) { + D_ERR("Socket path too long, len=%zu\n", strlen(sockpath)); + return -1; + } + + fd = socket(AF_UNIX, SOCK_STREAM, 0); + if (fd == -1) { + D_ERR("socket() failed, errno=%d\n", errno); + close(fd); + return -1; + } + + ret = connect(fd, (struct sockaddr *)&addr, sizeof(addr)); + if (ret == -1) { + D_ERR("connect() failed, errno=%d\n", errno); + close(fd); + return -1; + } + + return fd; +} + +struct sock_queue { + struct tevent_context *ev; + sock_queue_callback_fn_t callback; + void *private_data; + int fd; + + struct tevent_immediate *im; + struct tevent_queue *queue; + struct tevent_fd *fde; + uint8_t *buf; + size_t buflen, offset; +}; + +static bool sock_queue_set_fd(struct sock_queue *queue, int fd); +static int sock_queue_destructor(struct sock_queue *queue); +static void sock_queue_handler(struct tevent_context *ev, + struct tevent_fd *fde, uint16_t flags, + void *private_data); +static void sock_queue_process(struct sock_queue *queue); +static void sock_queue_process_event(struct tevent_context *ev, + struct tevent_immediate *im, + void *private_data); + +struct sock_queue *sock_queue_setup(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + int fd, + sock_queue_callback_fn_t callback, + void *private_data) +{ + struct sock_queue *queue; + + queue = talloc_zero(mem_ctx, struct sock_queue); + if (queue == NULL) { + return NULL; + } + + queue->ev = ev; + queue->callback = callback; + queue->private_data = private_data; + + queue->im = tevent_create_immediate(queue); + if (queue->im == NULL) { + talloc_free(queue); + return NULL; + } + + queue->queue = tevent_queue_create(queue, "out-queue"); + if (queue->queue == NULL) { + talloc_free(queue); + return NULL; + } + + if (! sock_queue_set_fd(queue, fd)) { + talloc_free(queue); + return NULL; + } + + talloc_set_destructor(queue, sock_queue_destructor); + + return queue; +} + +static bool sock_queue_set_fd(struct sock_queue *queue, int fd) +{ + TALLOC_FREE(queue->fde); + queue->fd = fd; + + if (fd != -1) { + int ret; + + ret = set_blocking(fd, false); + if (ret != 0) { + return false; + } + + queue->fde = tevent_add_fd(queue->ev, queue, fd, + TEVENT_FD_READ, + sock_queue_handler, queue); + if (queue->fde == NULL) { + return false; + } + tevent_fd_set_auto_close(queue->fde); + } + + return true; +} + +static int sock_queue_destructor(struct sock_queue *queue) +{ + TALLOC_FREE(queue->fde); + queue->fd = -1; + + return 0; +} + +static void sock_queue_handler(struct tevent_context *ev, + struct tevent_fd *fde, uint16_t flags, + void *private_data) +{ + struct sock_queue *queue = talloc_get_type_abort( + private_data, struct sock_queue); + int ret, num_ready; + ssize_t nread; + + ret = ioctl(queue->fd, FIONREAD, &num_ready); + if (ret != 0) { + /* Ignore */ + return; + } + + if (num_ready == 0) { + /* descriptor has been closed */ + goto fail; + } + + if (num_ready > queue->buflen - queue->offset) { + queue->buf = talloc_realloc_size(queue, queue->buf, + queue->offset + num_ready); + if (queue->buf == NULL) { + goto fail; + } + queue->buflen = queue->offset + num_ready; + } + + nread = sys_read(queue->fd, queue->buf + queue->offset, num_ready); + if (nread < 0) { + goto fail; + } + queue->offset += nread; + + sock_queue_process(queue); + return; + +fail: + queue->callback(NULL, 0, queue->private_data); +} + +static void sock_queue_process(struct sock_queue *queue) +{ + uint32_t pkt_size; + + if (queue->offset < sizeof(uint32_t)) { + /* not enough data */ + return; + } + + pkt_size = *(uint32_t *)queue->buf; + if (pkt_size == 0) { + D_ERR("Invalid packet of length 0\n"); + queue->callback(NULL, 0, queue->private_data); + } + + if (queue->offset < pkt_size) { + /* not enough data */ + return; + } + + queue->callback(queue->buf, pkt_size, queue->private_data); + queue->offset += pkt_size; + + if (queue->offset < queue->buflen) { + /* more data to be processed */ + tevent_schedule_immediate(queue->im, queue->ev, + sock_queue_process_event, queue); + } else { + TALLOC_FREE(queue->buf); + queue->buflen = 0; + queue->offset = 0; + } +} + +static void sock_queue_process_event(struct tevent_context *ev, + struct tevent_immediate *im, + void *private_data) +{ + struct sock_queue *queue = talloc_get_type_abort( + private_data, struct sock_queue); + + sock_queue_process(queue); +} + +struct sock_queue_write_state { + uint8_t *pkt; + uint32_t pkt_size; +}; + +static void sock_queue_trigger(struct tevent_req *req, void *private_data); + +int sock_queue_write(struct sock_queue *queue, uint8_t *buf, size_t buflen) +{ + struct tevent_req *req; + struct sock_queue_write_state *state; + bool status; + + if (buflen >= INT32_MAX) { + return -1; + } + + req = tevent_req_create(queue, &state, struct sock_queue_write_state); + if (req == NULL) { + return -1; + } + + state->pkt = buf; + state->pkt_size = (uint32_t)buflen; + + status = tevent_queue_add_entry(queue->queue, queue->ev, req, + sock_queue_trigger, queue); + if (! status) { + talloc_free(req); + return -1; + } + + return 0; +} + +static void sock_queue_trigger(struct tevent_req *req, void *private_data) +{ + struct sock_queue *queue = talloc_get_type_abort( + private_data, struct sock_queue); + struct sock_queue_write_state *state = tevent_req_data( + req, struct sock_queue_write_state); + size_t offset = 0; + + do { + ssize_t nwritten; + + nwritten = sys_write(queue->fd, state->pkt + offset, + state->pkt_size - offset); + if (nwritten < 0) { + queue->callback(NULL, 0, queue->private_data); + return; + } + offset += nwritten; + + } while (offset < state->pkt_size); + + tevent_req_done(req); + talloc_free(req); +} diff --git a/ctdb/common/sock_io.h b/ctdb/common/sock_io.h new file mode 100644 index 00000000000..cbb83a89ef2 --- /dev/null +++ b/ctdb/common/sock_io.h @@ -0,0 +1,38 @@ +/* + Generic Socket I/O + + Copyright (C) Amitay Isaacs 2016 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __CTDB_SOCK_IO_H__ +#define __CTDB_SOCK_IO_H__ + +typedef void (*sock_queue_callback_fn_t)(uint8_t *buf, size_t buflen, + void *private_data); + +struct sock_queue; + +int sock_connect(const char *sockpath); + +struct sock_queue *sock_queue_setup(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + int fd, + sock_queue_callback_fn_t callback, + void *private_data); + +int sock_queue_write(struct sock_queue *queue, uint8_t *buf, size_t buflen); + +#endif /* __CTDB_SOCK_IO_H__ */ diff --git a/ctdb/wscript b/ctdb/wscript index aa357c5d89e..96a852ffc7a 100644 --- a/ctdb/wscript +++ b/ctdb/wscript @@ -381,7 +381,7 @@ def build(bld): bld.SAMBA_SUBSYSTEM('ctdb-common', source=bld.SUBDIR('common', '''ctdb_io.c ctdb_util.c ctdb_ltdb.c - '''), + sock_io.c'''), includes='include', deps='replace popt talloc tevent tdb popt ctdb-system') |