summaryrefslogtreecommitdiff
path: root/ctdb
diff options
context:
space:
mode:
Diffstat (limited to 'ctdb')
-rw-r--r--ctdb/client/client.h10
-rw-r--r--ctdb/client/client_message.c132
2 files changed, 142 insertions, 0 deletions
diff --git a/ctdb/client/client.h b/ctdb/client/client.h
index df0b9b8bee7..9d4f73d7525 100644
--- a/ctdb/client/client.h
+++ b/ctdb/client/client.h
@@ -78,6 +78,16 @@ struct tevent_req *ctdb_client_message_send(TALLOC_CTX *mem_ctx,
bool ctdb_client_message_recv(struct tevent_req *req, int *perr);
+struct tevent_req *ctdb_client_message_multi_send(
+ TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ struct ctdb_client_context *client,
+ uint32_t *pnn_list, int count,
+ struct ctdb_req_message *message);
+
+bool ctdb_client_message_multi_recv(struct tevent_req *req, int *perr,
+ TALLOC_CTX *mem_ctx, int **perr_list);
+
int ctdb_client_message(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
struct ctdb_client_context *client,
uint32_t destnode, struct ctdb_req_message *message);
diff --git a/ctdb/client/client_message.c b/ctdb/client/client_message.c
index d5338420cd4..4b378a52c55 100644
--- a/ctdb/client/client_message.c
+++ b/ctdb/client/client_message.c
@@ -157,6 +157,138 @@ void ctdb_client_req_message(struct ctdb_client_context *client,
}
/*
+ * Handle multiple nodes
+ */
+
+struct ctdb_client_message_multi_state {
+ uint32_t *pnn_list;
+ int count;
+ int done;
+ int err;
+ int *err_list;
+};
+
+struct message_index_state {
+ struct tevent_req *req;
+ int index;
+};
+
+static void ctdb_client_message_multi_done(struct tevent_req *subreq);
+
+struct tevent_req *ctdb_client_message_multi_send(
+ TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ struct ctdb_client_context *client,
+ uint32_t *pnn_list, int count,
+ struct ctdb_req_message *message)
+{
+ struct tevent_req *req, *subreq;
+ struct ctdb_client_message_multi_state *state;
+ int i;
+
+ if (pnn_list == NULL || count == 0) {
+ return NULL;
+ }
+
+ req = tevent_req_create(mem_ctx, &state,
+ struct ctdb_client_message_multi_state);
+ if (req == NULL) {
+ return NULL;
+ }
+
+ state->pnn_list = pnn_list;
+ state->count = count;
+ state->done = 0;
+ state->err = 0;
+ state->err_list = talloc_zero_array(state, int, count);
+ if (tevent_req_nomem(state->err_list, req)) {
+ return tevent_req_post(req, ev);
+ }
+
+ for (i=0; i<count; i++) {
+ struct message_index_state *substate;
+
+ subreq = ctdb_client_message_send(state, ev, client,
+ pnn_list[i], message);
+ if (tevent_req_nomem(subreq, req)) {
+ return tevent_req_post(req, ev);
+ }
+
+ substate = talloc(subreq, struct message_index_state);
+ if (tevent_req_nomem(substate, req)) {
+ return tevent_req_post(req, ev);
+ }
+
+ substate->req = req;
+ substate->index = i;
+
+ tevent_req_set_callback(subreq, ctdb_client_message_multi_done,
+ substate);
+ }
+
+ return req;
+}
+
+static void ctdb_client_message_multi_done(struct tevent_req *subreq)
+{
+ struct message_index_state *substate = tevent_req_callback_data(
+ subreq, struct message_index_state);
+ struct tevent_req *req = substate->req;
+ int idx = substate->index;
+ struct ctdb_client_message_multi_state *state = tevent_req_data(
+ req, struct ctdb_client_message_multi_state);
+ bool status;
+ int ret;
+
+ status = ctdb_client_message_recv(subreq, &ret);
+ TALLOC_FREE(subreq);
+ if (! status) {
+ if (state->err == 0) {
+ state->err = ret;
+ state->err_list[idx] = state->err;
+ }
+ }
+
+ state->done += 1;
+
+ if (state->done == state->count) {
+ tevent_req_done(req);
+ }
+}
+
+bool ctdb_client_message_multi_recv(struct tevent_req *req, int *perr,
+ TALLOC_CTX *mem_ctx, int **perr_list)
+{
+ struct ctdb_client_message_multi_state *state = tevent_req_data(
+ req, struct ctdb_client_message_multi_state);
+ int err;
+
+ if (tevent_req_is_unix_error(req, &err)) {
+ if (perr != NULL) {
+ *perr = err;
+ }
+ if (perr_list != NULL) {
+ *perr_list = talloc_steal(mem_ctx, state->err_list);
+ }
+ return false;
+ }
+
+ if (perr != NULL) {
+ *perr = state->err;
+ }
+
+ if (perr_list != NULL) {
+ *perr_list = talloc_steal(mem_ctx, state->err_list);
+ }
+
+ if (state->err != 0) {
+ return false;
+ }
+
+ return true;
+}
+
+/*
* sync version of message send
*/