diff options
author | Amitay Isaacs <amitay@gmail.com> | 2017-04-06 19:33:47 +1000 |
---|---|---|
committer | Martin Schwenke <martins@samba.org> | 2017-10-10 11:45:19 +0200 |
commit | ad1a9176d1ffa1be29cf090821acbdb23476c252 (patch) | |
tree | 05e0d157bc613d1c54b1712744253e28c8d36375 /ctdb/client | |
parent | c700464d2330f3cede96349c0cdcc55bbb88a5a0 (diff) | |
download | samba-ad1a9176d1ffa1be29cf090821acbdb23476c252.tar.gz |
ctdb-client: Add client api for using tunnels
Signed-off-by: Amitay Isaacs <amitay@gmail.com>
Reviewed-by: Martin Schwenke <martin@meltin.net>
Diffstat (limited to 'ctdb/client')
-rw-r--r-- | ctdb/client/client.h | 206 | ||||
-rw-r--r-- | ctdb/client/client_connect.c | 11 | ||||
-rw-r--r-- | ctdb/client/client_private.h | 13 | ||||
-rw-r--r-- | ctdb/client/client_tunnel.c | 689 |
4 files changed, 919 insertions, 0 deletions
diff --git a/ctdb/client/client.h b/ctdb/client/client.h index 5e3b5c6a795..5be54b2dc0a 100644 --- a/ctdb/client/client.h +++ b/ctdb/client/client.h @@ -42,6 +42,11 @@ struct ctdb_client_context; /** + * @brief The abstract context that holds a tunnel endpoint + */ +struct ctdb_tunnel_context; + +/** * @brief The abstract context that represents a clustered database */ struct ctdb_db_context; @@ -67,6 +72,17 @@ struct ctdb_transaction_handle; typedef void (*ctdb_client_callback_func_t)(void *private_data); /** + * @brief Tunnel callback function + * + * This function is registered when a tunnel endpoint is set up. When the + * tunnel endpoint receives a message, this function is invoked. + */ +typedef void (*ctdb_tunnel_callback_func_t)(struct ctdb_tunnel_context *tctx, + uint32_t srcnode, uint32_t reqid, + uint8_t *buf, size_t buflen, + void *private_data); + +/** * @brief Initialize and connect to ctdb daemon * * This returns a ctdb client context. Freeing this context will free the @@ -545,6 +561,196 @@ int ctdb_client_control_multi_error(uint32_t *pnn_list, int count, int *err_list, uint32_t *pnn); /** + * @brief Async computation start to setup a tunnel endpoint + * + * This computation sets up a tunnel endpoint corresponding to a tunnel_id. + * A tunnel is a ctdb transport to deliver new protocol between endpoints. + * + * For two endpoints to communicate using new protocol, + * 1. Set up tunnel endpoints + * 2. Send requests + * 3. Send replies + * 4. Destroy tunnel endpoints + * + * @param[in] mem_ctx Talloc memory context + * @param[in] ev Tevent context + * @param[in] client Client connection context + * @param[in] tunnel_id Unique tunnel id + * @param[in] callback Callback function to call when a message is received + * @param[in] private_data Private data for callback + * @return a new tevent req on success, NULL on failure + */ +struct tevent_req *ctdb_tunnel_setup_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + uint64_t tunnel_id, + ctdb_tunnel_callback_func_t callback, + void *private_data); + +/** + * @brief Async computation end to setup a tunnel + * + * @param[in] req Tevent request + * @param[in] perr errno in case of failure + * @param[out] result A new tunnel context + * @return true on success, false on failure + * + * Tunnel context should never be freed by user. + */ +bool ctdb_tunnel_setup_recv(struct tevent_req *req, int *perr, + struct ctdb_tunnel_context **result); + +/** + * @brief Sync wrapper for ctdb_tunnel_setup computation + * + * @param[in] mem_ctx Talloc memory context + * @param[in] ev Tevent context + * @param[in] client Client connection context + * @param[in] tunnel_id Unique tunnel id + * @param[in] callback Callback function to call when a message is received + * @param[in] private_data Private data for callback + * @param[out] result A new tunnel context + * @return 0 on success, errno on failure + */ +int ctdb_tunnel_setup(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, uint64_t tunnel_id, + ctdb_tunnel_callback_func_t callback, void *private_data, + struct ctdb_tunnel_context **result); + +/** + * @brief Async computation start to destroy a tunnel endpoint + * + * This computation destroys the tunnel endpoint. + * + * @param[in] mem_ctx Talloc memory context + * @param[in] ev Tevent context + * @param[in] tctx Tunnel context + * @return a new tevent req on success, NULL on failure + */ +struct tevent_req *ctdb_tunnel_destroy_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_tunnel_context *tctx); + +/** + * @brief Async computation end to destroy a tunnel endpoint + * + * @param[in] req Tevent request + * @param[out] perr errno in case of failure + * @return true on success, false on failure + */ +bool ctdb_tunnel_destroy_recv(struct tevent_req *req, int *perr); + +/** + * @brief Sync wrapper for ctdb_tunnel_destroy computation + * + * @param[in] ev Tevent context + * @param[in] tctx Tunnel context + * @return 0 on success, errno on failure + */ +int ctdb_tunnel_destroy(struct tevent_context *ev, + struct ctdb_tunnel_context *tctx); + +/** + * @brief Async computation start to send a request via a tunnel + * + * @param[in] mem_ctx Talloc memory context + * @param[in] ev Tevent context + * @param[in] tctx Tunnel context + * @param[in] destnode PNN of destination + * @param[in] timeout How long to wait + * @param[in] buf Message to send + * @param[in] buflen Size of the message to send + * @param[in] wait_for_reply Whether to wait for reply + * @return a new tevent req on success, NULL on failure + */ +struct tevent_req *ctdb_tunnel_request_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_tunnel_context *tctx, + int destnode, + struct timeval timeout, + uint8_t *buf, size_t buflen, + bool wait_for_reply); + +/** + * @brief Async computation end to send a request via a tunnel + * + * @param[in] req Tevent request + * @param[out] perr errno in case of failure + * @param[in] mem_ctx Talloc context + * @param[out] buf Reply data if expected + * @param[out] buflen Size of reply data if expected + * @return true on success, false on failure + */ +bool ctdb_tunnel_request_recv(struct tevent_req *req, int *perr, + TALLOC_CTX *mem_ctx, uint8_t **buf, + size_t *buflen); + +/** + * @brief Sync wrapper for ctdb_tunnel_request computation + * + * @param[in] mem_ctx Talloc memory context + * @param[in] ev Tevent context + * @param[in] tctx Tunnel context + * @param[in] destnode PNN of destination + * @param[in] timeout How long to wait + * @param[in] buf Message to send + * @param[in] buflen Size of the message to send + * @param[in] wait_for_reply Whether to wait for reply + * @return 0 on success, errno on failure + */ +int ctdb_tunnel_request(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_tunnel_context *tctx, int destnode, + struct timeval timeout, uint8_t *buf, size_t buflen, + bool wait_for_reply); + +/** + * @brief Async computation start to send a reply via a tunnel + * + * @param[in] mem_ctx Talloc memory context + * @param[in] ev Tevent context + * @param[in] tctx Tunnel context + * @param[in] destnode PNN of destination + * @param[in] reqid Request id + * @param[in] timeout How long to wait + * @param[in] buf Reply data + * @param[in] buflen Size of reply data + * @return a new tevent req on success, NULL on failure + */ +struct tevent_req *ctdb_tunnel_reply_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_tunnel_context *tctx, + int destnode, uint32_t reqid, + struct timeval timeout, + uint8_t *buf, size_t buflen); + +/** + * @brief Async computation end to send a reply via a tunnel + * + * @param[in] req Tevent request + * @param[out] perr errno in case of failure + * @return true on success, false on failure + */ +bool ctdb_tunnel_reply_recv(struct tevent_req *req, int *perr); + +/** + * @brief Sync wrapper for ctdb_tunnel_reply computation + * + * @param[in] mem_ctx Talloc memory context + * @param[in] ev Tevent context + * @param[in] tctx Tunnel context + * @param[in] destnode PNN of destination + * @param[in] reqid Request id + * @param[in] timeout How long to wait + * @param[in] buf Reply data + * @param[in] buflen Size of reply data + * @return 0 on success, errno on failure + */ +int ctdb_tunnel_reply(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_tunnel_context *tctx, int destnode, + uint32_t reqid, struct timeval timeout, + uint8_t *buf, size_t buflen); + +/** * @brief Async computation start to attach a database * * @param[in] mem_ctx Talloc memory context diff --git a/ctdb/client/client_connect.c b/ctdb/client/client_connect.c index 6fd2c87da13..ed4371f2c89 100644 --- a/ctdb/client/client_connect.c +++ b/ctdb/client/client_connect.c @@ -72,6 +72,13 @@ int ctdb_client_init(TALLOC_CTX *mem_ctx, struct tevent_context *ev, return ret; } + ret = srvid_init(client, &client->tunnels); + if (ret != 0) { + DEBUG(DEBUG_ERR, ("srvid_init() failed, ret=%d\n", ret)); + talloc_free(client); + return ret; + } + client->fd = -1; client->pnn = CTDB_UNKNOWN_PNN; @@ -199,6 +206,10 @@ static void client_read_handler(uint8_t *buf, size_t buflen, ctdb_client_reply_control(client, buf, buflen, hdr.reqid); break; + case CTDB_REQ_TUNNEL: + ctdb_client_req_tunnel(client, buf, buflen, hdr.reqid); + break; + default: break; } diff --git a/ctdb/client/client_private.h b/ctdb/client/client_private.h index 5af019ba342..bb1705534e6 100644 --- a/ctdb/client/client_private.h +++ b/ctdb/client/client_private.h @@ -35,6 +35,7 @@ struct ctdb_db_context { struct ctdb_client_context { struct reqid_context *idr; struct srvid_context *srv; + struct srvid_context *tunnels; struct comm_context *comm; ctdb_client_callback_func_t callback; void *private_data; @@ -64,6 +65,13 @@ struct ctdb_transaction_handle { bool updated; }; +struct ctdb_tunnel_context { + struct ctdb_client_context *client; + uint64_t tunnel_id; + ctdb_tunnel_callback_func_t callback; + void *private_data; +}; + /* From client_call.c */ void ctdb_client_reply_call(struct ctdb_client_context *client, @@ -79,4 +87,9 @@ void ctdb_client_req_message(struct ctdb_client_context *client, void ctdb_client_reply_control(struct ctdb_client_context *client, uint8_t *buf, size_t buflen, uint32_t reqid); +/* From client_tunnel.c */ + +void ctdb_client_req_tunnel(struct ctdb_client_context *client, + uint8_t *buf, size_t buflen, uint32_t reqid); + #endif /* __CTDB_CLIENT_PRIVATE_H__ */ diff --git a/ctdb/client/client_tunnel.c b/ctdb/client/client_tunnel.c new file mode 100644 index 00000000000..11715b444a6 --- /dev/null +++ b/ctdb/client/client_tunnel.c @@ -0,0 +1,689 @@ +/* + CTDB client code + + Copyright (C) Amitay Isaacs 2016 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, see <http://www.gnu.org/licenses/>. +*/ + +#include "replace.h" +#include "system/network.h" + +#include <talloc.h> +#include <tevent.h> +#include <tdb.h> + +#include "lib/util/tevent_unix.h" + +#include "common/reqid.h" +#include "common/srvid.h" +#include "common/comm.h" + +#include "protocol/protocol.h" +#include "protocol/protocol_api.h" + +#include "client/client_private.h" +#include "client/client.h" + + +struct ctdb_tunnel_data { + struct ctdb_req_header hdr; + struct ctdb_req_tunnel *tunnel; + uint32_t reqid; +}; + +/* + * Tunnel setup and destroy + */ + +struct ctdb_tunnel_setup_state { + struct ctdb_client_context *client; + struct ctdb_tunnel_context *tctx; + uint64_t tunnel_id; +}; + +static void ctdb_tunnel_setup_register_done(struct tevent_req *subreq); +static void ctdb_tunnel_handler(uint64_t tunnel_id, TDB_DATA data, + void *private_data); + +struct tevent_req *ctdb_tunnel_setup_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + uint64_t tunnel_id, + ctdb_tunnel_callback_func_t callback, + void *private_data) +{ + struct tevent_req *req, *subreq; + struct ctdb_tunnel_setup_state *state; + struct ctdb_tunnel_context *tctx; + struct ctdb_req_control request; + int ret; + + req = tevent_req_create(mem_ctx, &state, + struct ctdb_tunnel_setup_state); + if (req == NULL) { + return NULL; + } + + tctx = talloc_zero(client, struct ctdb_tunnel_context); + if (tevent_req_nomem(tctx, req)) { + return tevent_req_post(req, ev); + } + + tctx->client = client; + tctx->tunnel_id = tunnel_id; + tctx->callback = callback; + tctx->private_data = private_data; + + state->client = client; + state->tunnel_id = tunnel_id; + state->tctx = tctx; + + ret = srvid_exists(client->tunnels, tunnel_id, NULL); + if (ret == 0) { + tevent_req_error(req, EEXIST); + return tevent_req_post(req, ev); + } + + ctdb_req_control_tunnel_register(&request, tunnel_id); + subreq = ctdb_client_control_send(state, ev, client, + ctdb_client_pnn(client), + tevent_timeval_zero(), + &request); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, ctdb_tunnel_setup_register_done, req); + + return req; +} + +static void ctdb_tunnel_setup_register_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_tunnel_setup_state *state = tevent_req_data( + req, struct ctdb_tunnel_setup_state); + struct ctdb_reply_control *reply; + bool status; + int ret; + + status = ctdb_client_control_recv(subreq, &ret, state, &reply); + TALLOC_FREE(subreq); + if (! status) { + tevent_req_error(req, ret); + return; + } + + ret = ctdb_reply_control_tunnel_register(reply); + talloc_free(reply); + if (ret != 0) { + tevent_req_error(req, ret); + return; + } + + ret = srvid_register(state->client->tunnels, state->client, + state->tunnel_id, + ctdb_tunnel_handler, state->tctx); + if (ret != 0) { + tevent_req_error(req, ret); + return; + } + + tevent_req_done(req); +} + +static void ctdb_tunnel_handler(uint64_t tunnel_id, TDB_DATA data, + void *private_data) +{ + struct ctdb_tunnel_context *tctx = talloc_get_type_abort( + private_data, struct ctdb_tunnel_context); + struct ctdb_tunnel_data *tunnel_data; + + if (tctx->tunnel_id != tunnel_id) { + return; + } + + if (data.dsize != sizeof(tunnel_data)) { + return; + } + + tunnel_data = (struct ctdb_tunnel_data *)data.dptr; + + tctx->callback(tctx, tunnel_data->hdr.srcnode, tunnel_data->reqid, + tunnel_data->tunnel->data.dptr, + tunnel_data->tunnel->data.dsize, tctx->private_data); +} + +bool ctdb_tunnel_setup_recv(struct tevent_req *req, int *perr, + struct ctdb_tunnel_context **result) +{ + struct ctdb_tunnel_setup_state *state = tevent_req_data( + req, struct ctdb_tunnel_setup_state); + int ret; + + if (tevent_req_is_unix_error(req, &ret)) { + if (perr != NULL) { + *perr = ret; + } + return false; + } + + *result = state->tctx; + return true; +} + +int ctdb_tunnel_setup(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, uint64_t tunnel_id, + ctdb_tunnel_callback_func_t callback, void *private_data, + struct ctdb_tunnel_context **result) +{ + struct tevent_req *req; + int ret; + bool status; + + req = ctdb_tunnel_setup_send(mem_ctx, ev, client, tunnel_id, + callback, private_data); + if (req == NULL) { + return ENOMEM; + } + + tevent_req_poll(req, ev); + + status = ctdb_tunnel_setup_recv(req, &ret, result); + talloc_free(req); + if (! status) { + return ret; + } + + return 0; +} + +struct ctdb_tunnel_destroy_state { + struct ctdb_tunnel_context *tctx; +}; + +static void ctdb_tunnel_destroy_deregister_done(struct tevent_req *subreq); + +struct tevent_req *ctdb_tunnel_destroy_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_tunnel_context *tctx) +{ + struct tevent_req *req, *subreq; + struct ctdb_tunnel_destroy_state *state; + struct ctdb_req_control request; + + req = tevent_req_create(mem_ctx, &state, + struct ctdb_tunnel_destroy_state); + if (req == NULL) { + return NULL; + } + + state->tctx = tctx; + + ctdb_req_control_tunnel_deregister(&request, tctx->tunnel_id); + subreq = ctdb_client_control_send(state, ev, tctx->client, + ctdb_client_pnn(tctx->client), + tevent_timeval_zero(), + &request); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, ctdb_tunnel_destroy_deregister_done, + req); + + return req; +} + +static void ctdb_tunnel_destroy_deregister_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_tunnel_destroy_state *state = tevent_req_data( + req, struct ctdb_tunnel_destroy_state); + struct ctdb_client_context *client = state->tctx->client; + struct ctdb_reply_control *reply; + bool status; + int ret; + + status = ctdb_client_control_recv(subreq, &ret, state, &reply); + TALLOC_FREE(subreq); + if (! status) { + tevent_req_error(req, ret); + return; + } + + ret = ctdb_reply_control_tunnel_deregister(reply); + talloc_free(reply); + if (ret != 0) { + tevent_req_error(req, ret); + return; + } + + ret = srvid_deregister(client->tunnels, state->tctx->tunnel_id, + state->tctx); + if (ret != 0) { + tevent_req_error(req, ret); + return; + } + + tevent_req_done(req); +} + +bool ctdb_tunnel_destroy_recv(struct tevent_req *req, int *perr) +{ + int ret; + + if (tevent_req_is_unix_error(req, &ret)) { + if (perr != NULL) { + *perr = ret; + } + return false; + } + return true; +} + + +int ctdb_tunnel_destroy(struct tevent_context *ev, + struct ctdb_tunnel_context *tctx) +{ + struct tevent_req *req; + int ret; + bool status; + + req = ctdb_tunnel_destroy_send(ev, ev, tctx); + if (req == NULL) { + return ENOMEM; + } + + tevent_req_poll(req, ev); + + status = ctdb_tunnel_destroy_recv(req, &ret); + talloc_free(req); + if (! status) { + return ret; + } + + return 0; +} + +/* + * Callback when REQ_TUNNEL packet is received + */ + +static void ctdb_tunnel_request_reply(struct tevent_req *req, + struct ctdb_tunnel_data *tunnel_data); + +void ctdb_client_req_tunnel(struct ctdb_client_context *client, + uint8_t *buf, size_t buflen, uint32_t reqid) +{ + TALLOC_CTX *tmp_ctx = talloc_new(client); + struct ctdb_req_header h; + struct ctdb_req_tunnel *tunnel; + struct tevent_req *req; + struct ctdb_tunnel_data tunnel_data; + int ret; + + tunnel = talloc_zero(tmp_ctx, struct ctdb_req_tunnel); + if (tunnel == NULL) { + goto fail; + } + + ret = ctdb_req_tunnel_pull(buf, buflen, &h, tmp_ctx, tunnel); + if (ret != 0) { + goto fail; + } + + tunnel_data = (struct ctdb_tunnel_data) { + .hdr = h, + .tunnel = tunnel, + .reqid = reqid, + }; + + if (tunnel->flags & CTDB_TUNNEL_FLAG_REPLY) { + req = reqid_find(client->idr, reqid, struct tevent_req); + if (req == NULL) { + goto fail; + } + + ctdb_tunnel_request_reply(req, &tunnel_data); + + } else if (tunnel->flags & CTDB_TUNNEL_FLAG_REQUEST) { + + TDB_DATA data = { + .dsize = sizeof(&tunnel_data), + .dptr = (uint8_t *)&tunnel_data, + }; + + srvid_dispatch(client->tunnels, tunnel->tunnel_id, 0, data); + } + +fail: + TALLOC_FREE(tmp_ctx); +} + + +/* + * Send messages using tunnel + */ + +struct ctdb_tunnel_request_state { + struct ctdb_tunnel_context *tctx; + bool wait_for_reply; + uint32_t reqid; + struct ctdb_req_tunnel *tunnel; +}; + +static int ctdb_tunnel_request_state_destructor( + struct ctdb_tunnel_request_state *state); +static void ctdb_tunnel_request_done(struct tevent_req *subreq); + +struct tevent_req *ctdb_tunnel_request_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_tunnel_context *tctx, + int destnode, + struct timeval timeout, + uint8_t *buf, size_t buflen, + bool wait_for_reply) +{ + struct tevent_req *req, *subreq; + struct ctdb_tunnel_request_state *state; + struct ctdb_req_tunnel tunnel; + struct ctdb_req_header h; + uint8_t *pkt; + size_t datalen, pkt_len; + int ret; + + req = tevent_req_create(mem_ctx, &state, + struct ctdb_tunnel_request_state); + if (req == NULL) { + return NULL; + } + + state->tctx = tctx; + state->wait_for_reply = wait_for_reply; + state->reqid = reqid_new(tctx->client->idr, req); + if (state->reqid == REQID_INVALID) { + talloc_free(req); + return NULL; + } + + talloc_set_destructor(state, ctdb_tunnel_request_state_destructor); + + tunnel = (struct ctdb_req_tunnel) { + .tunnel_id = state->tctx->tunnel_id, + .flags = CTDB_TUNNEL_FLAG_REQUEST, + .data = (TDB_DATA) { + .dptr = buf, + .dsize = buflen, + }, + }; + + if (destnode == CTDB_BROADCAST_ALL || + destnode == CTDB_BROADCAST_VNNMAP || + destnode == CTDB_BROADCAST_ALL) { + state->wait_for_reply = false; + } + if (! state->wait_for_reply) { + tunnel.flags |= CTDB_TUNNEL_FLAG_NOREPLY; + } + + ctdb_req_header_fill(&h, 0, CTDB_REQ_TUNNEL, destnode, + ctdb_client_pnn(state->tctx->client), + state->reqid); + + datalen = ctdb_req_tunnel_len(&h, &tunnel); + ret = ctdb_allocate_pkt(state, datalen, &pkt, &pkt_len); + if (ret != 0) { + tevent_req_error(req, ret); + return tevent_req_post(req, ev); + } + + ret = ctdb_req_tunnel_push(&h, &tunnel, pkt, &pkt_len); + if (ret != 0) { + tevent_req_error(req, ret); + return tevent_req_post(req, ev); + } + + if (!tevent_timeval_is_zero(&timeout)) { + tevent_req_set_endtime(req, ev, timeout); + } + + subreq = comm_write_send(state, ev, tctx->client->comm, + pkt, pkt_len); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, ctdb_tunnel_request_done, req); + + return req; +} + +static int ctdb_tunnel_request_state_destructor( + struct ctdb_tunnel_request_state *state) +{ + reqid_remove(state->tctx->client->idr, state->reqid); + return 0; +} + +static void ctdb_tunnel_request_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_tunnel_request_state *state = tevent_req_data( + req, struct ctdb_tunnel_request_state); + int ret; + bool status; + + status = comm_write_recv(subreq, &ret); + TALLOC_FREE(subreq); + if (! status) { + tevent_req_error(req, ret); + return; + } + + if (! state->wait_for_reply) { + tevent_req_done(req); + } + + /* Wait for the reply or timeout */ +} + +static void ctdb_tunnel_request_reply(struct tevent_req *req, + struct ctdb_tunnel_data *tunnel_data) +{ + struct ctdb_tunnel_request_state *state = tevent_req_data( + req, struct ctdb_tunnel_request_state); + + if (tunnel_data->reqid != state->reqid) { + return; + } + + state->tunnel = talloc_steal(state, tunnel_data->tunnel); + tevent_req_done(req); +} + +bool ctdb_tunnel_request_recv(struct tevent_req *req, int *perr, + TALLOC_CTX *mem_ctx, uint8_t **buf, + size_t *buflen) +{ + struct ctdb_tunnel_request_state *state = tevent_req_data( + req, struct ctdb_tunnel_request_state); + int ret; + + if (tevent_req_is_unix_error(req, &ret)) { + if (perr != NULL) { + *perr = ret; + } + return false; + } + + if (state->wait_for_reply) { + if (buf != NULL) { + *buf = talloc_steal(mem_ctx, state->tunnel->data.dptr); + } + if (buflen != NULL) { + *buflen = state->tunnel->data.dsize; + } + } + + return true; +} + +int ctdb_tunnel_request(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_tunnel_context *tctx, int destnode, + struct timeval timeout, uint8_t *buf, size_t buflen, + bool wait_for_reply) +{ + struct tevent_req *req; + int ret; + bool status; + + req = ctdb_tunnel_request_send(mem_ctx, ev, tctx, destnode, + timeout, buf, buflen, wait_for_reply); + if (req == NULL) { + return ENOMEM; + } + + tevent_req_poll(req, ev); + + status = ctdb_tunnel_request_recv(req, &ret, NULL, NULL, NULL); + talloc_free(req); + if (! status) { + return ret; + } + + return 0; +} + +struct ctdb_tunnel_reply_state { +}; + +static void ctdb_tunnel_reply_done(struct tevent_req *subreq); + +struct tevent_req *ctdb_tunnel_reply_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_tunnel_context *tctx, + int destnode, uint32_t reqid, + struct timeval timeout, + uint8_t *buf, size_t buflen) +{ + struct tevent_req *req, *subreq; + struct ctdb_tunnel_reply_state *state; + struct ctdb_req_tunnel tunnel; + struct ctdb_req_header h; + uint8_t *pkt; + size_t datalen, pkt_len; + int ret; + + req = tevent_req_create(mem_ctx, &state, + struct ctdb_tunnel_reply_state); + if (req == NULL) { + return NULL; + } + + tunnel = (struct ctdb_req_tunnel) { + .tunnel_id = tctx->tunnel_id, + .flags = CTDB_TUNNEL_FLAG_REPLY, + .data = (TDB_DATA) { + .dptr = buf, + .dsize = buflen, + }, + }; + + ctdb_req_header_fill(&h, 0, CTDB_REQ_TUNNEL, destnode, + ctdb_client_pnn(tctx->client), reqid); + + datalen = ctdb_req_tunnel_len(&h, &tunnel); + ret = ctdb_allocate_pkt(state, datalen, &pkt, &pkt_len); + if (ret != 0) { + tevent_req_error(req, ret); + return tevent_req_post(req, ev); + } + + ret = ctdb_req_tunnel_push(&h, &tunnel, pkt, &pkt_len); + if (ret != 0) { + tevent_req_error(req, ret); + return tevent_req_post(req, ev); + } + + if (!tevent_timeval_is_zero(&timeout)) { + tevent_req_set_endtime(req, ev, timeout); + } + + subreq = comm_write_send(state, ev, tctx->client->comm, pkt, pkt_len); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, ctdb_tunnel_reply_done, req); + + return req; +} + +static void ctdb_tunnel_reply_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + int ret; + bool status; + + status = comm_write_recv(subreq, &ret); + TALLOC_FREE(subreq); + if (! status) { + tevent_req_error(req, ret); + return; + } + + tevent_req_done(req); +} + +bool ctdb_tunnel_reply_recv(struct tevent_req *req, int *perr) +{ + int ret; + + if (tevent_req_is_unix_error(req, &ret)) { + if (perr != NULL) { + *perr = ret; + } + return false; + } + + return true; +} + +int ctdb_tunnel_reply(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_tunnel_context *tctx, int destnode, + uint32_t reqid, struct timeval timeout, + uint8_t *buf, size_t buflen) +{ + struct tevent_req *req; + int ret; + bool status; + + req = ctdb_tunnel_reply_send(mem_ctx, ev, tctx, destnode, reqid, + timeout, buf, buflen); + if (req == NULL) { + return ENOMEM; + } + + tevent_req_poll(req, ev); + + status = ctdb_tunnel_reply_recv(req, &ret); + talloc_free(req); + if (! status) { + return ret; + } + + return 0; +} |