diff options
Diffstat (limited to 'ctdb')
-rw-r--r-- | ctdb/client/client.h | 10 | ||||
-rw-r--r-- | ctdb/client/client_message.c | 132 |
2 files changed, 142 insertions, 0 deletions
diff --git a/ctdb/client/client.h b/ctdb/client/client.h index df0b9b8bee7..9d4f73d7525 100644 --- a/ctdb/client/client.h +++ b/ctdb/client/client.h @@ -78,6 +78,16 @@ struct tevent_req *ctdb_client_message_send(TALLOC_CTX *mem_ctx, bool ctdb_client_message_recv(struct tevent_req *req, int *perr); +struct tevent_req *ctdb_client_message_multi_send( + TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + uint32_t *pnn_list, int count, + struct ctdb_req_message *message); + +bool ctdb_client_message_multi_recv(struct tevent_req *req, int *perr, + TALLOC_CTX *mem_ctx, int **perr_list); + int ctdb_client_message(TALLOC_CTX *mem_ctx, struct tevent_context *ev, struct ctdb_client_context *client, uint32_t destnode, struct ctdb_req_message *message); diff --git a/ctdb/client/client_message.c b/ctdb/client/client_message.c index d5338420cd4..4b378a52c55 100644 --- a/ctdb/client/client_message.c +++ b/ctdb/client/client_message.c @@ -157,6 +157,138 @@ void ctdb_client_req_message(struct ctdb_client_context *client, } /* + * Handle multiple nodes + */ + +struct ctdb_client_message_multi_state { + uint32_t *pnn_list; + int count; + int done; + int err; + int *err_list; +}; + +struct message_index_state { + struct tevent_req *req; + int index; +}; + +static void ctdb_client_message_multi_done(struct tevent_req *subreq); + +struct tevent_req *ctdb_client_message_multi_send( + TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + uint32_t *pnn_list, int count, + struct ctdb_req_message *message) +{ + struct tevent_req *req, *subreq; + struct ctdb_client_message_multi_state *state; + int i; + + if (pnn_list == NULL || count == 0) { + return NULL; + } + + req = tevent_req_create(mem_ctx, &state, + struct ctdb_client_message_multi_state); + if (req == NULL) { + return NULL; + } + + state->pnn_list = pnn_list; + state->count = count; + state->done = 0; + state->err = 0; + state->err_list = talloc_zero_array(state, int, count); + if (tevent_req_nomem(state->err_list, req)) { + return tevent_req_post(req, ev); + } + + for (i=0; i<count; i++) { + struct message_index_state *substate; + + subreq = ctdb_client_message_send(state, ev, client, + pnn_list[i], message); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + + substate = talloc(subreq, struct message_index_state); + if (tevent_req_nomem(substate, req)) { + return tevent_req_post(req, ev); + } + + substate->req = req; + substate->index = i; + + tevent_req_set_callback(subreq, ctdb_client_message_multi_done, + substate); + } + + return req; +} + +static void ctdb_client_message_multi_done(struct tevent_req *subreq) +{ + struct message_index_state *substate = tevent_req_callback_data( + subreq, struct message_index_state); + struct tevent_req *req = substate->req; + int idx = substate->index; + struct ctdb_client_message_multi_state *state = tevent_req_data( + req, struct ctdb_client_message_multi_state); + bool status; + int ret; + + status = ctdb_client_message_recv(subreq, &ret); + TALLOC_FREE(subreq); + if (! status) { + if (state->err == 0) { + state->err = ret; + state->err_list[idx] = state->err; + } + } + + state->done += 1; + + if (state->done == state->count) { + tevent_req_done(req); + } +} + +bool ctdb_client_message_multi_recv(struct tevent_req *req, int *perr, + TALLOC_CTX *mem_ctx, int **perr_list) +{ + struct ctdb_client_message_multi_state *state = tevent_req_data( + req, struct ctdb_client_message_multi_state); + int err; + + if (tevent_req_is_unix_error(req, &err)) { + if (perr != NULL) { + *perr = err; + } + if (perr_list != NULL) { + *perr_list = talloc_steal(mem_ctx, state->err_list); + } + return false; + } + + if (perr != NULL) { + *perr = state->err; + } + + if (perr_list != NULL) { + *perr_list = talloc_steal(mem_ctx, state->err_list); + } + + if (state->err != 0) { + return false; + } + + return true; +} + +/* * sync version of message send */ |