summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAmitay Isaacs <amitay@gmail.com>2016-09-16 16:13:18 +1000
committerAmitay Isaacs <amitay@samba.org>2016-12-18 14:23:22 +0100
commit75a25d13310c53328f2b99f856abc8c5d023bc52 (patch)
tree084da9f1ddd6ac94abfa1bc947e8b85f4b5bdfd7
parentf0ba41e1c5a9b4000b918211fc16c3545d2133ab (diff)
downloadsamba-75a25d13310c53328f2b99f856abc8c5d023bc52.tar.gz
ctdb-common: Add generic socket I/O
This is a generic socket read/write to be used in the ctdb daemon. It is based on ctdb_io.c and comm.c. Signed-off-by: Amitay Isaacs <amitay@gmail.com> Reviewed-by: Martin Schwenke <martin@meltin.net>
-rw-r--r--ctdb/common/sock_io.c309
-rw-r--r--ctdb/common/sock_io.h38
-rw-r--r--ctdb/wscript2
3 files changed, 348 insertions, 1 deletions
diff --git a/ctdb/common/sock_io.c b/ctdb/common/sock_io.c
new file mode 100644
index 00000000000..7245d4e973d
--- /dev/null
+++ b/ctdb/common/sock_io.c
@@ -0,0 +1,309 @@
+/*
+ Generic Unix-domain Socket I/O
+
+ Copyright (C) Amitay Isaacs 2016
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "replace.h"
+#include "system/filesys.h"
+#include "system/network.h"
+
+#include <talloc.h>
+#include <tevent.h>
+
+#include "lib/util/sys_rw.h"
+#include "lib/util/debug.h"
+#include "lib/util/blocking.h"
+
+#include "common/logging.h"
+#include "common/sock_io.h"
+
+int sock_connect(const char *sockpath)
+{
+ struct sockaddr_un addr;
+ size_t len;
+ int fd, ret;
+
+ if (sockpath == NULL) {
+ D_ERR("Invalid socket path\n");
+ return -1;
+ }
+
+ memset(&addr, 0, sizeof(addr));
+ addr.sun_family = AF_UNIX;
+ len = strlcpy(addr.sun_path, sockpath, sizeof(addr.sun_path));
+ if (len >= sizeof(addr.sun_path)) {
+ D_ERR("Socket path too long, len=%zu\n", strlen(sockpath));
+ return -1;
+ }
+
+ fd = socket(AF_UNIX, SOCK_STREAM, 0);
+ if (fd == -1) {
+ D_ERR("socket() failed, errno=%d\n", errno);
+ close(fd);
+ return -1;
+ }
+
+ ret = connect(fd, (struct sockaddr *)&addr, sizeof(addr));
+ if (ret == -1) {
+ D_ERR("connect() failed, errno=%d\n", errno);
+ close(fd);
+ return -1;
+ }
+
+ return fd;
+}
+
+struct sock_queue {
+ struct tevent_context *ev;
+ sock_queue_callback_fn_t callback;
+ void *private_data;
+ int fd;
+
+ struct tevent_immediate *im;
+ struct tevent_queue *queue;
+ struct tevent_fd *fde;
+ uint8_t *buf;
+ size_t buflen, offset;
+};
+
+static bool sock_queue_set_fd(struct sock_queue *queue, int fd);
+static int sock_queue_destructor(struct sock_queue *queue);
+static void sock_queue_handler(struct tevent_context *ev,
+ struct tevent_fd *fde, uint16_t flags,
+ void *private_data);
+static void sock_queue_process(struct sock_queue *queue);
+static void sock_queue_process_event(struct tevent_context *ev,
+ struct tevent_immediate *im,
+ void *private_data);
+
+struct sock_queue *sock_queue_setup(TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ int fd,
+ sock_queue_callback_fn_t callback,
+ void *private_data)
+{
+ struct sock_queue *queue;
+
+ queue = talloc_zero(mem_ctx, struct sock_queue);
+ if (queue == NULL) {
+ return NULL;
+ }
+
+ queue->ev = ev;
+ queue->callback = callback;
+ queue->private_data = private_data;
+
+ queue->im = tevent_create_immediate(queue);
+ if (queue->im == NULL) {
+ talloc_free(queue);
+ return NULL;
+ }
+
+ queue->queue = tevent_queue_create(queue, "out-queue");
+ if (queue->queue == NULL) {
+ talloc_free(queue);
+ return NULL;
+ }
+
+ if (! sock_queue_set_fd(queue, fd)) {
+ talloc_free(queue);
+ return NULL;
+ }
+
+ talloc_set_destructor(queue, sock_queue_destructor);
+
+ return queue;
+}
+
+static bool sock_queue_set_fd(struct sock_queue *queue, int fd)
+{
+ TALLOC_FREE(queue->fde);
+ queue->fd = fd;
+
+ if (fd != -1) {
+ int ret;
+
+ ret = set_blocking(fd, false);
+ if (ret != 0) {
+ return false;
+ }
+
+ queue->fde = tevent_add_fd(queue->ev, queue, fd,
+ TEVENT_FD_READ,
+ sock_queue_handler, queue);
+ if (queue->fde == NULL) {
+ return false;
+ }
+ tevent_fd_set_auto_close(queue->fde);
+ }
+
+ return true;
+}
+
+static int sock_queue_destructor(struct sock_queue *queue)
+{
+ TALLOC_FREE(queue->fde);
+ queue->fd = -1;
+
+ return 0;
+}
+
+static void sock_queue_handler(struct tevent_context *ev,
+ struct tevent_fd *fde, uint16_t flags,
+ void *private_data)
+{
+ struct sock_queue *queue = talloc_get_type_abort(
+ private_data, struct sock_queue);
+ int ret, num_ready;
+ ssize_t nread;
+
+ ret = ioctl(queue->fd, FIONREAD, &num_ready);
+ if (ret != 0) {
+ /* Ignore */
+ return;
+ }
+
+ if (num_ready == 0) {
+ /* descriptor has been closed */
+ goto fail;
+ }
+
+ if (num_ready > queue->buflen - queue->offset) {
+ queue->buf = talloc_realloc_size(queue, queue->buf,
+ queue->offset + num_ready);
+ if (queue->buf == NULL) {
+ goto fail;
+ }
+ queue->buflen = queue->offset + num_ready;
+ }
+
+ nread = sys_read(queue->fd, queue->buf + queue->offset, num_ready);
+ if (nread < 0) {
+ goto fail;
+ }
+ queue->offset += nread;
+
+ sock_queue_process(queue);
+ return;
+
+fail:
+ queue->callback(NULL, 0, queue->private_data);
+}
+
+static void sock_queue_process(struct sock_queue *queue)
+{
+ uint32_t pkt_size;
+
+ if (queue->offset < sizeof(uint32_t)) {
+ /* not enough data */
+ return;
+ }
+
+ pkt_size = *(uint32_t *)queue->buf;
+ if (pkt_size == 0) {
+ D_ERR("Invalid packet of length 0\n");
+ queue->callback(NULL, 0, queue->private_data);
+ }
+
+ if (queue->offset < pkt_size) {
+ /* not enough data */
+ return;
+ }
+
+ queue->callback(queue->buf, pkt_size, queue->private_data);
+ queue->offset += pkt_size;
+
+ if (queue->offset < queue->buflen) {
+ /* more data to be processed */
+ tevent_schedule_immediate(queue->im, queue->ev,
+ sock_queue_process_event, queue);
+ } else {
+ TALLOC_FREE(queue->buf);
+ queue->buflen = 0;
+ queue->offset = 0;
+ }
+}
+
+static void sock_queue_process_event(struct tevent_context *ev,
+ struct tevent_immediate *im,
+ void *private_data)
+{
+ struct sock_queue *queue = talloc_get_type_abort(
+ private_data, struct sock_queue);
+
+ sock_queue_process(queue);
+}
+
+struct sock_queue_write_state {
+ uint8_t *pkt;
+ uint32_t pkt_size;
+};
+
+static void sock_queue_trigger(struct tevent_req *req, void *private_data);
+
+int sock_queue_write(struct sock_queue *queue, uint8_t *buf, size_t buflen)
+{
+ struct tevent_req *req;
+ struct sock_queue_write_state *state;
+ bool status;
+
+ if (buflen >= INT32_MAX) {
+ return -1;
+ }
+
+ req = tevent_req_create(queue, &state, struct sock_queue_write_state);
+ if (req == NULL) {
+ return -1;
+ }
+
+ state->pkt = buf;
+ state->pkt_size = (uint32_t)buflen;
+
+ status = tevent_queue_add_entry(queue->queue, queue->ev, req,
+ sock_queue_trigger, queue);
+ if (! status) {
+ talloc_free(req);
+ return -1;
+ }
+
+ return 0;
+}
+
+static void sock_queue_trigger(struct tevent_req *req, void *private_data)
+{
+ struct sock_queue *queue = talloc_get_type_abort(
+ private_data, struct sock_queue);
+ struct sock_queue_write_state *state = tevent_req_data(
+ req, struct sock_queue_write_state);
+ size_t offset = 0;
+
+ do {
+ ssize_t nwritten;
+
+ nwritten = sys_write(queue->fd, state->pkt + offset,
+ state->pkt_size - offset);
+ if (nwritten < 0) {
+ queue->callback(NULL, 0, queue->private_data);
+ return;
+ }
+ offset += nwritten;
+
+ } while (offset < state->pkt_size);
+
+ tevent_req_done(req);
+ talloc_free(req);
+}
diff --git a/ctdb/common/sock_io.h b/ctdb/common/sock_io.h
new file mode 100644
index 00000000000..cbb83a89ef2
--- /dev/null
+++ b/ctdb/common/sock_io.h
@@ -0,0 +1,38 @@
+/*
+ Generic Socket I/O
+
+ Copyright (C) Amitay Isaacs 2016
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __CTDB_SOCK_IO_H__
+#define __CTDB_SOCK_IO_H__
+
+typedef void (*sock_queue_callback_fn_t)(uint8_t *buf, size_t buflen,
+ void *private_data);
+
+struct sock_queue;
+
+int sock_connect(const char *sockpath);
+
+struct sock_queue *sock_queue_setup(TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ int fd,
+ sock_queue_callback_fn_t callback,
+ void *private_data);
+
+int sock_queue_write(struct sock_queue *queue, uint8_t *buf, size_t buflen);
+
+#endif /* __CTDB_SOCK_IO_H__ */
diff --git a/ctdb/wscript b/ctdb/wscript
index aa357c5d89e..96a852ffc7a 100644
--- a/ctdb/wscript
+++ b/ctdb/wscript
@@ -381,7 +381,7 @@ def build(bld):
bld.SAMBA_SUBSYSTEM('ctdb-common',
source=bld.SUBDIR('common',
'''ctdb_io.c ctdb_util.c ctdb_ltdb.c
- '''),
+ sock_io.c'''),
includes='include',
deps='replace popt talloc tevent tdb popt ctdb-system')