diff options
author | Amitay Isaacs <amitay@gmail.com> | 2015-04-04 20:23:44 +1100 |
---|---|---|
committer | Amitay Isaacs <amitay@samba.org> | 2015-10-07 14:53:29 +0200 |
commit | 1543eedb8feaa85336216aa22df2145522425184 (patch) | |
tree | 2ee364582c7a7e84c6bf2e8e2714bdd1e9910a21 /ctdb | |
parent | e01c0eed38335e7b421ab4f79410f08ab1d31482 (diff) | |
download | samba-1543eedb8feaa85336216aa22df2145522425184.tar.gz |
ctdb-common: Add communication endpoint abstraction
Signed-off-by: Amitay Isaacs <amitay@gmail.com>
Reviewed-by: Martin Schwenke <martin@meltin.net>
Diffstat (limited to 'ctdb')
-rw-r--r-- | ctdb/common/comm.c | 404 | ||||
-rw-r--r-- | ctdb/common/comm.h | 101 | ||||
-rwxr-xr-x | ctdb/tests/cunit/comm_test_001.sh | 7 | ||||
-rwxr-xr-x | ctdb/tests/cunit/comm_test_002.sh | 24 | ||||
-rw-r--r-- | ctdb/tests/src/comm_client_test.c | 207 | ||||
-rw-r--r-- | ctdb/tests/src/comm_server_test.c | 363 | ||||
-rw-r--r-- | ctdb/tests/src/comm_test.c | 260 | ||||
-rwxr-xr-x | ctdb/wscript | 5 |
8 files changed, 1370 insertions, 1 deletions
diff --git a/ctdb/common/comm.c b/ctdb/common/comm.c new file mode 100644 index 00000000000..1bbb46050b5 --- /dev/null +++ b/ctdb/common/comm.c @@ -0,0 +1,404 @@ +/* + Communication endpoint implementation + + Copyright (C) Amitay Isaacs 2015 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, see <http://www.gnu.org/licenses/>. +*/ + +#include "replace.h" +#include "system/network.h" +#include "system/filesys.h" + +#include <talloc.h> +#include <tdb.h> + +#include "lib/util/tevent_unix.h" + +#include "pkt_read.h" +#include "pkt_write.h" +#include "comm.h" + +static bool set_nonblocking(int fd) +{ + int v; + + v = fcntl(fd, F_GETFL, 0); + if (v == -1) { + return false; + } + if (fcntl(fd, F_SETFL, v | O_NONBLOCK) == -1) { + return false; + } + return true; +} + +/* + * Communication endpoint around a socket + */ + +#define SMALL_PKT_SIZE 1024 + +struct comm_context { + int fd; + comm_read_handler_fn read_handler; + void *read_private_data; + comm_dead_handler_fn dead_handler; + void *dead_private_data; + uint8_t small_pkt[SMALL_PKT_SIZE]; + struct tevent_req *read_req, *write_req; + struct tevent_fd *fde; + struct tevent_queue *queue; +}; + +static void comm_fd_handler(struct tevent_context *ev, + struct tevent_fd *fde, + uint16_t flags, void *private_data); +static struct tevent_req *comm_read_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct comm_context *comm, + uint8_t *buf, size_t buflen); +static void comm_read_failed(struct tevent_req *req); + + +int comm_setup(TALLOC_CTX *mem_ctx, struct tevent_context *ev, int fd, + comm_read_handler_fn read_handler, void *read_private_data, + comm_dead_handler_fn dead_handler, void *dead_private_data, + struct comm_context **result) +{ + struct comm_context *comm; + + if (fd < 0) { + return EINVAL; + } + + if (dead_handler == NULL) { + return EINVAL; + } + + /* Socket queue relies on non-blocking sockets. */ + if (!set_nonblocking(fd)) { + return EIO; + } + + comm = talloc_zero(mem_ctx, struct comm_context); + if (comm == NULL) { + return ENOMEM; + } + + comm->fd = fd; + comm->read_handler = read_handler; + comm->read_private_data = read_private_data; + comm->dead_handler = dead_handler; + comm->dead_private_data = dead_private_data; + + comm->queue = tevent_queue_create(comm, "comm write queue"); + if (comm->queue == NULL) { + goto fail; + } + + /* Set up to write packets */ + comm->fde = tevent_add_fd(ev, comm, fd, TEVENT_FD_READ, + comm_fd_handler, comm); + if (comm->fde == NULL) { + goto fail; + } + + /* Set up to read packets */ + if (read_handler != NULL) { + struct tevent_req *req; + + req = comm_read_send(comm, ev, comm, comm->small_pkt, + SMALL_PKT_SIZE); + if (req == NULL) { + goto fail; + } + + tevent_req_set_callback(req, comm_read_failed, comm); + comm->read_req = req; + } + + *result = comm; + return 0; + +fail: + talloc_free(comm); + return ENOMEM; +} + + +/* + * Read packets + */ + +struct comm_read_state { + struct tevent_context *ev; + struct comm_context *comm; + uint8_t *buf; + size_t buflen; + struct tevent_req *subreq; +}; + +static ssize_t comm_read_more(uint8_t *buf, size_t buflen, void *private_data); +static void comm_read_done(struct tevent_req *subreq); + +static struct tevent_req *comm_read_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct comm_context *comm, + uint8_t *buf, size_t buflen) +{ + struct tevent_req *req, *subreq; + struct comm_read_state *state; + + req = tevent_req_create(mem_ctx, &state, struct comm_read_state); + if (req == NULL) { + return NULL; + } + + state->ev = ev; + state->comm = comm; + state->buf = buf; + state->buflen = buflen; + + subreq = pkt_read_send(state, state->ev, comm->fd, sizeof(uint32_t), + state->buf, state->buflen, + comm_read_more, NULL); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + state->subreq = subreq; + + tevent_req_set_callback(subreq, comm_read_done, req); + return req; +} + +static ssize_t comm_read_more(uint8_t *buf, size_t buflen, void *private_data) +{ + uint32_t packet_len; + + if (buflen < sizeof(uint32_t)) { + return sizeof(uint32_t) - buflen; + } + + packet_len = *(uint32_t *)buf; + + return packet_len - buflen; +} + +static void comm_read_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct comm_read_state *state = tevent_req_data( + req, struct comm_read_state); + struct comm_context *comm = state->comm; + ssize_t nread; + uint8_t *buf; + bool free_buf; + int err = 0; + + nread = pkt_read_recv(subreq, state, &buf, &free_buf, &err); + TALLOC_FREE(subreq); + state->subreq = NULL; + if (nread == -1) { + tevent_req_error(req, err); + return; + } + + comm->read_handler(buf, nread, comm->read_private_data); + + if (free_buf) { + talloc_free(buf); + } + + subreq = pkt_read_send(state, state->ev, comm->fd, sizeof(uint32_t), + state->buf, state->buflen, + comm_read_more, NULL); + if (tevent_req_nomem(subreq, req)) { + return; + } + state->subreq = subreq; + + tevent_req_set_callback(subreq, comm_read_done, req); +} + +static void comm_read_recv(struct tevent_req *req, int *perr) +{ + int err; + + if (tevent_req_is_unix_error(req, &err)) { + if (perr != NULL) { + *perr = err; + } + } +} + +static void comm_read_failed(struct tevent_req *req) +{ + struct comm_context *comm = tevent_req_callback_data( + req, struct comm_context); + + comm_read_recv(req, NULL); + TALLOC_FREE(req); + comm->read_req = NULL; + if (comm->dead_handler != NULL) { + comm->dead_handler(comm->dead_private_data); + } +} + + +/* + * Write packets + */ + +struct comm_write_state { + struct tevent_context *ev; + struct comm_context *comm; + struct tevent_req *subreq; + uint8_t *buf; + size_t buflen, nwritten; +}; + +static void comm_write_trigger(struct tevent_req *req, void *private_data); +static void comm_write_done(struct tevent_req *subreq); + +struct tevent_req *comm_write_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct comm_context *comm, + uint8_t *buf, size_t buflen) +{ + struct tevent_req *req; + struct comm_write_state *state; + + req = tevent_req_create(mem_ctx, &state, struct comm_write_state); + if (req == NULL) { + return NULL; + } + + state->ev = ev; + state->comm = comm; + state->buf = buf; + state->buflen = buflen; + + if (!tevent_queue_add_entry(comm->queue, ev, req, + comm_write_trigger, NULL)) { + talloc_free(req); + return NULL; + } + + return req; +} + +static void comm_write_trigger(struct tevent_req *req, void *private_data) +{ + struct comm_write_state *state = tevent_req_data( + req, struct comm_write_state); + struct comm_context *comm = state->comm; + struct tevent_req *subreq; + + comm->write_req = req; + + subreq = pkt_write_send(state, state->ev, comm->fd, + state->buf, state->buflen); + if (tevent_req_nomem(subreq, req)) { + return; + } + + state->subreq = subreq; + tevent_req_set_callback(subreq, comm_write_done, req); + TEVENT_FD_WRITEABLE(comm->fde); +} + +static void comm_write_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct comm_write_state *state = tevent_req_data( + req, struct comm_write_state); + struct comm_context *comm = state->comm; + ssize_t nwritten; + int err = 0; + + TEVENT_FD_NOT_WRITEABLE(comm->fde); + nwritten = pkt_write_recv(subreq, &err); + TALLOC_FREE(subreq); + state->subreq = NULL; + comm->write_req = NULL; + if (nwritten == -1) { + if (err == EPIPE) { + comm->dead_handler(comm->dead_private_data); + } + tevent_req_error(req, err); + return; + } + + state->nwritten = nwritten; + tevent_req_done(req); +} + +bool comm_write_recv(struct tevent_req *req, int *perr) +{ + struct comm_write_state *state = tevent_req_data( + req, struct comm_write_state); + int err; + + if (tevent_req_is_unix_error(req, &err)) { + if (perr != NULL) { + *perr = err; + } + return false; + } + + if (state->nwritten != state->buflen) { + *perr = EIO; + return false; + } + + *perr = 0; + return true; +} + +static void comm_fd_handler(struct tevent_context *ev, + struct tevent_fd *fde, + uint16_t flags, void *private_data) +{ + struct comm_context *comm = talloc_get_type_abort( + private_data, struct comm_context); + + if (flags & TEVENT_FD_READ) { + struct comm_read_state *read_state; + + if (comm->read_req == NULL) { + /* This should never happen */ + abort(); + } + + read_state = tevent_req_data(comm->read_req, + struct comm_read_state); + pkt_read_handler(ev, fde, flags, read_state->subreq); + } + + if (flags & TEVENT_FD_WRITE) { + struct comm_write_state *write_state; + + if (comm->write_req == NULL) { + /* This should never happen */ + abort(); + } + + write_state = tevent_req_data(comm->write_req, + struct comm_write_state); + pkt_write_handler(ev, fde, flags, write_state->subreq); + } +} diff --git a/ctdb/common/comm.h b/ctdb/common/comm.h new file mode 100644 index 00000000000..27021e945af --- /dev/null +++ b/ctdb/common/comm.h @@ -0,0 +1,101 @@ +/* + Communication endpoint API + + Copyright (C) Amitay Isaacs 2015 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __CTDB_COMM_H__ +#define __CTDB_COMM_H__ + +#include <talloc.h> +#include <tevent.h> + +/** + * @file comm.h + * + * @brief Communication over a socket or file descriptor + * + * This abstraction is a wrapper around a socket or file descriptor to + * send/receive complete packets. + */ + +/** + * @brief Packet handler function + * + * This function is registered while setting up communication endpoint. Any + * time packets are read, this function is called. + */ +typedef void (*comm_read_handler_fn)(uint8_t *buf, size_t buflen, + void *private_data); + +/** + * @brief Communication endpoint dead handler function + * + * This function is called when the communication endpoint is closed. + */ +typedef void (*comm_dead_handler_fn)(void *private_data); + +/** + * @brief Abstract struct to store communication endpoint details + */ +struct comm_context; + +/** + * @brief Initialize the communication endpoint + * + * This return a new communication context. Freeing this context will free all + * memory assoicated with it. + * + * @param[in] mem_ctx Talloc memory context + * @param[in] ev Tevent context + * @param[in] fd The socket or file descriptor + * @param[in] read_handler The packet handler function + * @param[in] read_private_data Private data for read handler function + * @param[in] dead_handler The communication dead handler function + * @param[in] dead_private_data Private data for dead handler function + * @param[out] result The new comm_context structure + * @return 0 on success, errno on failure + */ +int comm_setup(TALLOC_CTX *mem_ctx, struct tevent_context *ev, int fd, + comm_read_handler_fn read_handler, void *read_private_data, + comm_dead_handler_fn dead_handler, void *dead_private_data, + struct comm_context **result); + +/** + * @brief Async computation start to send a packet + * + * @param[in] mem_ctx Talloc memory context + * @param[in] ev Tevent context + * @param[in] comm Communication context + * @param[in] buf The packet data + * @param[in] buflen The size of the packet + * @return new tevent request, or NULL on failure + */ +struct tevent_req *comm_write_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct comm_context *comm, + uint8_t *buf, size_t buflen); + +/** + * @brief Async computation end to send a packet + * + * @param[in] req Tevent request + * @param[out] perr errno in case of failure + * @return true on success, false on failure + */ +bool comm_write_recv(struct tevent_req *req, int *perr); + +#endif /* __CTDB_COMM_H__ */ diff --git a/ctdb/tests/cunit/comm_test_001.sh b/ctdb/tests/cunit/comm_test_001.sh new file mode 100755 index 00000000000..5d20db2289d --- /dev/null +++ b/ctdb/tests/cunit/comm_test_001.sh @@ -0,0 +1,7 @@ +#!/bin/sh + +. "${TEST_SCRIPTS_DIR}/unit.sh" + +ok "100 2048 500 4096 1024 8192 200 16384 300 32768 400 65536 1048576 " + +unit_test comm_test diff --git a/ctdb/tests/cunit/comm_test_002.sh b/ctdb/tests/cunit/comm_test_002.sh new file mode 100755 index 00000000000..76ee62d1a24 --- /dev/null +++ b/ctdb/tests/cunit/comm_test_002.sh @@ -0,0 +1,24 @@ +#!/bin/sh + +. "${TEST_SCRIPTS_DIR}/unit.sh" + +socket="${TEST_VAR_DIR}/test_sock.$$" +num_clients=10 + +remove_socket () +{ + rm -f "$socket" +} + +test_cleanup remove_socket + +ok_null + +unit_test comm_server_test "$socket" $num_clients & +pid=$! + +for i in $(seq 1 $num_clients) ; do + unit_test comm_client_test "$socket" +done + +wait $pid diff --git a/ctdb/tests/src/comm_client_test.c b/ctdb/tests/src/comm_client_test.c new file mode 100644 index 00000000000..d3f5f9e20bf --- /dev/null +++ b/ctdb/tests/src/comm_client_test.c @@ -0,0 +1,207 @@ +/* + comm tests + + Copyright (C) Amitay Isaacs 2015 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, see <http://www.gnu.org/licenses/>. +*/ + +#include "replace.h" +#include "system/filesys.h" + +#include <assert.h> + +#include "common/pkt_read.c" +#include "common/pkt_write.c" +#include "common/comm.c" + + +struct writer_state { + struct tevent_context *ev; + struct comm_context *comm; + uint8_t *buf; + size_t *pkt_size; + int count, id; +}; + +static void writer_done(struct tevent_req *subreq); +static void read_handler(uint8_t *buf, size_t buflen, void *private_data); +static void dead_handler(void *private_data); + +static struct tevent_req *writer_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + int fd, size_t *pkt_size, + int count) +{ + struct tevent_req *req, *subreq; + struct writer_state *state; + size_t max_size = 0, buflen; + int i, ret; + + for (i=0; i<count; i++) { + if (pkt_size[i] > max_size) { + max_size = pkt_size[i]; + } + } + + req = tevent_req_create(mem_ctx, &state, struct writer_state); + if (req == NULL) { + return NULL; + } + + state->ev = ev; + state->pkt_size = pkt_size; + state->count = count; + state->id = 0; + + ret = comm_setup(state, ev, fd, read_handler, req, + dead_handler, req, &state->comm); + if (ret != 0) { + tevent_req_error(req, ret); + return tevent_req_post(req, ev); + } + + state->buf = talloc_array(state, uint8_t, max_size); + if (state->buf == NULL) { + talloc_free(req); + return NULL; + } + for (i=0; i<max_size; i++) { + state->buf[i] = i%256; + } + + buflen = state->pkt_size[state->id]; + *(uint32_t *)state->buf = buflen; + subreq = comm_write_send(state, state->ev, state->comm, + state->buf, buflen); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, writer_done, req); + + return req; +} + +static void writer_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + bool ret; + int err; + + ret = comm_write_recv(subreq, &err); + TALLOC_FREE(subreq); + if (!ret) { + tevent_req_error(req, err); + return; + } +} + +static void read_handler(uint8_t *buf, size_t buflen, void *private_data) +{ + struct tevent_req *req = talloc_get_type_abort( + private_data, struct tevent_req); + struct writer_state *state = tevent_req_data( + req, struct writer_state); + struct tevent_req *subreq; + + if (buflen != state->pkt_size[state->id]) { + tevent_req_error(req, EIO); + return; + } + + state->id++; + if (state->id >= state->count) { + tevent_req_done(req); + return; + } + + buflen = state->pkt_size[state->id]; + *(uint32_t *)state->buf = buflen; + subreq = comm_write_send(state, state->ev, state->comm, + state->buf, buflen); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, writer_done, req); +} + +static void dead_handler(void *private_data) +{ + struct tevent_req *req = talloc_get_type_abort( + private_data, struct tevent_req); + + tevent_req_error(req, EPIPE); +} + +static void writer_recv(struct tevent_req *req, int *perr) +{ + if (tevent_req_is_unix_error(req, perr)) { + return; + } + *perr = 0; +} + +static int socket_init(char *sockpath) +{ + struct sockaddr_un addr; + int fd, ret; + + memset(&addr, 0, sizeof(addr)); + addr.sun_family = AF_UNIX; + strcpy(addr.sun_path, sockpath); + + fd = socket(AF_UNIX, SOCK_STREAM, 0); + assert(fd != -1); + + ret = connect(fd, (struct sockaddr *)&addr, sizeof(addr)); + assert(ret != -1); + + return fd; +} + +int main(int argc, char *argv[]) +{ + TALLOC_CTX *mem_ctx; + struct tevent_context *ev; + struct tevent_req *req; + int fd; + size_t pkt_size[13] = { 100, 2048, 500, 4096, 1024, 8192, + 200, 16384, 300, 32768, 400, 65536, + 1024*1024 }; + int err; + + if (argc != 2) { + printf("Usage: %s <sockpath>\n", argv[0]); + exit(1); + } + + mem_ctx = talloc_new(NULL); + assert(mem_ctx != NULL); + + ev = tevent_context_init(mem_ctx); + assert(ev != NULL); + + fd = socket_init(argv[1]); + + req = writer_send(mem_ctx, ev, fd, pkt_size, 13); + assert(req != NULL); + + tevent_req_poll(req, ev); + + writer_recv(req, &err); + assert(err == 0); + + exit(0); +} diff --git a/ctdb/tests/src/comm_server_test.c b/ctdb/tests/src/comm_server_test.c new file mode 100644 index 00000000000..fe0fffdf3d0 --- /dev/null +++ b/ctdb/tests/src/comm_server_test.c @@ -0,0 +1,363 @@ +/* + comm tests + + Copyright (C) Amitay Isaacs 2015 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, see <http://www.gnu.org/licenses/>. +*/ + +#include "replace.h" +#include "system/filesys.h" + +#include <assert.h> + +#include "common/pkt_read.c" +#include "common/pkt_write.c" +#include "common/comm.c" + + +struct accept_state { + int listen_fd; + struct tevent_fd *fde; + int client_fd; +}; + +static void accept_handler(struct tevent_context *ev, struct tevent_fd *fde, + uint16_t flags, void *private_data); + +static struct tevent_req *accept_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + int listen_fd) +{ + struct tevent_req *req; + struct accept_state *state; + + req = tevent_req_create(mem_ctx, &state, struct accept_state); + if (req == NULL) { + return NULL; + } + + state->listen_fd = listen_fd; + + state->fde = tevent_add_fd(ev, state, listen_fd, TEVENT_FD_READ, + accept_handler, req); + if (tevent_req_nomem(state->fde, req)) { + return tevent_req_post(req, ev); + } + return req; +} + +static void accept_handler(struct tevent_context *ev, struct tevent_fd *fde, + uint16_t flags, void *private_data) +{ + struct tevent_req *req = talloc_get_type_abort( + private_data, struct tevent_req); + struct accept_state *state = tevent_req_data( + req, struct accept_state); + struct sockaddr addr; + socklen_t addrlen = sizeof(addr); + int ret; + + TALLOC_FREE(state->fde); + + if ((flags & TEVENT_FD_READ) == 0) { + tevent_req_error(req, EIO); + return; + } + + ret = accept(state->listen_fd, &addr, &addrlen); + if (ret == -1) { + tevent_req_error(req, errno); + return; + } + + state->client_fd = ret; + tevent_req_done(req); +} + +static int accept_recv(struct tevent_req *req, int *perr) +{ + struct accept_state *state = tevent_req_data( + req, struct accept_state); + int err; + + if (tevent_req_is_unix_error(req, &err)) { + if (perr != NULL) { + *perr = err; + } + return -1; + } + + return state->client_fd; +} + + +struct echo_state { + struct tevent_context *ev; + int fd; + struct comm_context *comm; + uint8_t *data; +}; + +static void read_handler(uint8_t *buf, size_t buflen, void *private_data); +static void read_failed(void *private_data); +static void write_done(struct tevent_req *subreq); + +static struct tevent_req *echo_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, int fd) +{ + struct tevent_req *req; + struct echo_state *state; + int ret; + + req = tevent_req_create(mem_ctx, &state, struct echo_state); + if (req == NULL) { + return NULL; + } + + state->ev = ev; + state->fd = fd; + + ret = comm_setup(state, ev, fd, read_handler, req, + read_failed, req, &state->comm); + if (ret != 0) { + tevent_req_error(req, ret); + return tevent_req_post(req, ev); + } + + return req; +} + +static void read_handler(uint8_t *buf, size_t buflen, void *private_data) +{ + struct tevent_req *req = talloc_get_type_abort( + private_data, struct tevent_req); + struct echo_state *state = tevent_req_data( + req, struct echo_state); + struct tevent_req *subreq; + + state->data = talloc_memdup(state, buf, buflen); + if (tevent_req_nomem(state->data, req)) { + return; + } + + subreq = comm_write_send(state, state->ev, state->comm, + state->data, buflen); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, write_done, req); +} + +static void read_failed(void *private_data) +{ + struct tevent_req *req = talloc_get_type_abort( + private_data, struct tevent_req); + + tevent_req_done(req); +} + +static void write_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct echo_state *state = tevent_req_data( + req, struct echo_state); + bool ret; + int err; + + TALLOC_FREE(state->data); + + ret = comm_write_recv(subreq, &err); + TALLOC_FREE(subreq); + if (!ret) { + tevent_req_error(req, err); + return; + } +} + +static bool echo_recv(struct tevent_req *req, int *perr) +{ + struct echo_state *state = tevent_req_data( + req, struct echo_state); + int err; + + if (tevent_req_is_unix_error(req, &err)) { + if (perr != NULL) { + *perr = err; + } + return false; + } + + close(state->fd); + return true; +} + + +struct socket_process_state { + struct tevent_context *ev; + int fd; + int max_clients; + int num_clients; +}; + +static void socket_process_client(struct tevent_req *subreq); +static void socket_process_client_done(struct tevent_req *subreq); + +static struct tevent_req *socket_process_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + int fd, int max_clients) +{ + struct tevent_req *req, *subreq; + struct socket_process_state *state; + + req = tevent_req_create(mem_ctx, &state, struct socket_process_state); + if (req == NULL) { + return NULL; + } + + state->ev = ev; + state->fd = fd; + state->max_clients = max_clients; + state->num_clients = 0; + + subreq = accept_send(state, ev, fd); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, socket_process_client, req); + + return req; +} + +static void socket_process_client(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct socket_process_state *state = tevent_req_data( + req, struct socket_process_state); + int client_fd, err; + + client_fd = accept_recv(subreq, &err); + TALLOC_FREE(subreq); + + state->num_clients++; + + if (client_fd == -1) { + tevent_req_error(req, err); + return; + } + + subreq = echo_send(state, state->ev, client_fd); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, socket_process_client_done, req); + + if (state->num_clients == state->max_clients) { + /* Stop accepting any more clients */ + return; + } + + subreq = accept_send(state, state->ev, state->fd); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, socket_process_client, req); +} + +static void socket_process_client_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct socket_process_state *state = tevent_req_data( + req, struct socket_process_state); + bool ret; + int err = 0; + + ret = echo_recv(subreq, &err); + TALLOC_FREE(subreq); + if (!ret) { + tevent_req_error(req, EIO); + return; + } + + if (state->num_clients == state->max_clients) { + tevent_req_done(req); + } +} + +static void socket_process_recv(struct tevent_req *req, int *perr) +{ + int err; + + if (tevent_req_is_unix_error(req, &err)) { + if (perr != NULL) { + *perr = err; + } + } +} + +static int socket_init(char *sockpath) +{ + struct sockaddr_un addr; + int fd, ret; + + memset(&addr, 0, sizeof(addr)); + addr.sun_family = AF_UNIX; + strcpy(addr.sun_path, sockpath); + + fd = socket(AF_UNIX, SOCK_STREAM, 0); + assert(fd != -1); + + ret = bind(fd, (struct sockaddr *)&addr, sizeof(addr)); + assert(ret != -1); + + ret = listen(fd, 10); + assert(ret != -1); + + return fd; +} + +int main(int argc, char *argv[]) +{ + TALLOC_CTX *mem_ctx; + struct tevent_context *ev; + struct tevent_req *req; + int fd, err = 0; + int num_clients; + + if (argc != 3) { + printf("Usage: %s <sockpath> <num_clients>\n", argv[0]); + exit(1); + } + + mem_ctx = talloc_new(NULL); + assert(mem_ctx != NULL); + + ev = tevent_context_init(mem_ctx); + assert(ev != NULL); + + fd = socket_init(argv[1]); + num_clients = atoi(argv[2]); + assert(num_clients > 0); + + req = socket_process_send(mem_ctx, ev, fd, num_clients); + assert(req != NULL); + + tevent_req_poll(req, ev); + + socket_process_recv(req, &err); + return err; +} diff --git a/ctdb/tests/src/comm_test.c b/ctdb/tests/src/comm_test.c new file mode 100644 index 00000000000..2189435528b --- /dev/null +++ b/ctdb/tests/src/comm_test.c @@ -0,0 +1,260 @@ +/* + comm tests + + Copyright (C) Amitay Isaacs 2015 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, see <http://www.gnu.org/licenses/>. +*/ + +#include "replace.h" +#include "system/filesys.h" + +#include <assert.h> + +#include "common/pkt_read.c" +#include "common/pkt_write.c" +#include "common/comm.c" + +static void dead_handler(void *private_data) +{ + int dead_data = *(int *)private_data; + + assert(dead_data == 1 || dead_data == 2); + + if (dead_data == 1) { + /* reader */ + printf("writer closed pipe\n"); + } else { + /* writer */ + printf("reader closed pipe\n"); + } +} + +struct writer_state { + struct tevent_context *ev; + struct comm_context *comm; + uint8_t *buf; + size_t *pkt_size; + int count, id; +}; + +static void writer_next(struct tevent_req *subreq); + +static struct tevent_req *writer_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct comm_context *comm, + size_t *pkt_size, int count) +{ + struct tevent_req *req, *subreq; + struct writer_state *state; + size_t max_size = 0, buflen; + int i; + + for (i=0; i<count; i++) { + if (pkt_size[i] > max_size) { + max_size = pkt_size[i]; + } + } + + req = tevent_req_create(mem_ctx, &state, struct writer_state); + if (req == NULL) { + return NULL; + } + + state->ev = ev; + state->comm = comm; + state->pkt_size = pkt_size; + state->count = count; + state->id = 0; + + state->buf = talloc_array(state, uint8_t, max_size); + if (state->buf == NULL) { + talloc_free(req); + return NULL; + } + for (i=0; i<max_size; i++) { + state->buf[i] = i%256; + } + + buflen = state->pkt_size[state->id]; + *(uint32_t *)state->buf = buflen; + subreq = comm_write_send(state, state->ev, state->comm, + state->buf, buflen); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + + tevent_req_set_callback(subreq, writer_next, req); + return req; +} + +static void writer_next(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct writer_state *state = tevent_req_data( + req, struct writer_state); + bool ret; + int err; + size_t buflen; + + ret = comm_write_recv(subreq, &err); + TALLOC_FREE(subreq); + if (!ret) { + tevent_req_error(req, err); + return; + } + + state->id++; + if (state->id >= state->count) { + tevent_req_done(req); + return; + } + + buflen = state->pkt_size[state->id]; + *(uint32_t *)state->buf = buflen; + subreq = comm_write_send(state, state->ev, state->comm, + state->buf, buflen); + if (tevent_req_nomem(subreq, req)) { + return; + } + + tevent_req_set_callback(subreq, writer_next, req); +} + +static void writer_recv(struct tevent_req *req, int *perr) +{ + if (tevent_req_is_unix_error(req, perr)) { + return; + } + *perr = 0; +} + +static void writer(int fd, size_t *pkt_size, int count) +{ + TALLOC_CTX *mem_ctx; + struct tevent_context *ev; + struct comm_context *comm; + struct tevent_req *req; + int dead_data = 2; + int err; + + mem_ctx = talloc_new(NULL); + assert(mem_ctx != NULL); + + ev = tevent_context_init(mem_ctx); + assert(ev != NULL); + + err = comm_setup(mem_ctx, ev, fd, NULL, NULL, + dead_handler, &dead_data, &comm); + assert(err == 0); + assert(comm != NULL); + + req = writer_send(mem_ctx, ev, comm, pkt_size, count); + assert(req != NULL); + + tevent_req_poll(req, ev); + + writer_recv(req, &err); + assert(err == 0); + + talloc_free(mem_ctx); +} + +struct reader_state { + size_t *pkt_size; + int count, received; + bool done; +}; + +static void reader_handler(uint8_t *buf, size_t buflen, void *private_data) +{ + struct reader_state *state = talloc_get_type_abort( + private_data, struct reader_state); + + assert(buflen == state->pkt_size[state->received]); + printf("%zi ", buflen); + state->received++; + + if (state->received == state->count) { + printf("\n"); + state->done = true; + } +} + +static void reader(int fd, size_t *pkt_size, int count) +{ + TALLOC_CTX *mem_ctx; + struct tevent_context *ev; + struct comm_context *comm; + struct reader_state *state; + int dead_data = 1; + int err; + + mem_ctx = talloc_new(NULL); + assert(mem_ctx != NULL); + + ev = tevent_context_init(mem_ctx); + assert(ev != NULL); + + state = talloc_zero(mem_ctx, struct reader_state); + assert(state != NULL); + + state->pkt_size = pkt_size; + state->count = count; + state->received = 0; + state->done = false; + + err = comm_setup(mem_ctx, ev, fd, reader_handler, state, + dead_handler, &dead_data, &comm); + assert(err == 0); + assert(comm != NULL); + + while (!state->done) { + tevent_loop_once(ev); + } + + talloc_free(mem_ctx); +} + +int main(void) +{ + int fd[2]; + int ret; + pid_t pid; + size_t pkt_size[13] = { 100, 2048, 500, 4096, 1024, 8192, + 200, 16384, 300, 32768, 400, 65536, + 1024*1024 }; + + + ret = pipe(fd); + assert(ret == 0); + + pid = fork(); + assert(pid != -1); + + if (pid == 0) { + /* Child process */ + close(fd[0]); + writer(fd[1], pkt_size, 13); + close(fd[1]); + exit(0); + } + + close(fd[1]); + reader(fd[0], pkt_size, 13); + close(fd[0]); + + return 0; +} diff --git a/ctdb/wscript b/ctdb/wscript index 454f03f9727..a08d8617c6b 100755 --- a/ctdb/wscript +++ b/ctdb/wscript @@ -336,7 +336,7 @@ def build(bld): bld.SAMBA_SUBSYSTEM('ctdb-util', source=bld.SUBDIR('common', '''db_hash.c srvid.c reqid.c - pkt_read.c pkt_write.c'''), + pkt_read.c pkt_write.c comm.c'''), deps='replace talloc tevent tdb tevent-unix-util') bld.SAMBA_SUBSYSTEM('ctdb-client', @@ -580,6 +580,9 @@ def build(bld): 'srvid_test', 'pkt_read_test', 'pkt_write_test', + 'comm_test', + 'comm_server_test', + 'comm_client_test', ] for target in ctdb_unit_tests: |