diff options
-rw-r--r-- | source3/lib/ctdbd_conn.c | 141 | ||||
-rw-r--r-- | source3/lib/messages_ctdbd.c | 85 |
2 files changed, 85 insertions, 141 deletions
diff --git a/source3/lib/ctdbd_conn.c b/source3/lib/ctdbd_conn.c index a0dbd32434f..4e030184b22 100644 --- a/source3/lib/ctdbd_conn.c +++ b/source3/lib/ctdbd_conn.c @@ -130,20 +130,6 @@ NTSTATUS register_with_ctdbd(struct ctdbd_connection *conn, uint64_t srvid, return NT_STATUS_OK; } -static bool ctdb_is_our_srvid(struct ctdbd_connection *conn, uint64_t srvid) -{ - size_t i, num_callbacks; - - num_callbacks = talloc_array_length(conn->callbacks); - - for (i=0; i<num_callbacks; i++) { - if (srvid == conn->callbacks[i].srvid) { - return true; - } - } - return false; -} - static void ctdbd_msg_call_back(struct ctdbd_connection *conn, struct ctdb_req_message *msg) { @@ -292,78 +278,6 @@ static int ctdbd_connect(int *pfd) return 0; } -/* - * State necessary to defer an incoming message while we are waiting for a - * ctdb reply. - */ - -struct deferred_msg_state { - struct messaging_context *msg_ctx; - struct messaging_rec *rec; -}; - -/* - * Timed event handler for the deferred message - */ - -static void deferred_message_dispatch(struct tevent_context *event_ctx, - struct tevent_timer *te, - struct timeval now, - void *private_data) -{ - struct deferred_msg_state *state = talloc_get_type_abort( - private_data, struct deferred_msg_state); - - messaging_dispatch_rec(state->msg_ctx, state->rec); - TALLOC_FREE(state); - TALLOC_FREE(te); -} - -/* - * Fetch a messaging_rec from an incoming ctdb style message - */ - -static struct messaging_rec *ctdb_pull_messaging_rec(TALLOC_CTX *mem_ctx, - size_t overall_length, - struct ctdb_req_message *msg) -{ - struct messaging_rec *result; - DATA_BLOB blob; - enum ndr_err_code ndr_err; - - if ((overall_length < offsetof(struct ctdb_req_message, data)) - || (overall_length - < offsetof(struct ctdb_req_message, data) + msg->datalen)) { - - cluster_fatal("got invalid msg length"); - } - - if (!(result = talloc(mem_ctx, struct messaging_rec))) { - DEBUG(0, ("talloc failed\n")); - return NULL; - } - - blob = data_blob_const(msg->data, msg->datalen); - - ndr_err = ndr_pull_struct_blob( - &blob, result, result, - (ndr_pull_flags_fn_t)ndr_pull_messaging_rec); - - if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) { - DEBUG(0, ("ndr_pull_struct_blob failed: %s\n", - ndr_errstr(ndr_err))); - TALLOC_FREE(result); - return NULL; - } - - if (DEBUGLEVEL >= 11) { - DEBUG(11, ("ctdb_pull_messaging_rec:\n")); - NDR_PRINT_DEBUG(messaging_rec, result); - } - - return result; -} - static NTSTATUS ctdb_read_packet(int fd, TALLOC_CTX *mem_ctx, struct ctdb_req_header **result) { @@ -447,8 +361,6 @@ static NTSTATUS ctdb_read_req(struct ctdbd_connection *conn, uint32_t reqid, ctdb_packet_dump(hdr); if (hdr->operation == CTDB_REQ_MESSAGE) { - struct tevent_timer *evt; - struct deferred_msg_state *msg_state; struct ctdb_req_message *msg = (struct ctdb_req_message *)hdr; if (conn->msg_ctx == NULL) { @@ -497,42 +409,7 @@ static NTSTATUS ctdb_read_req(struct ctdbd_connection *conn, uint32_t reqid, } ctdbd_msg_call_back(conn, msg); - - msg_state = talloc(NULL, struct deferred_msg_state); - if (msg_state == NULL) { - DEBUG(0, ("talloc failed\n")); - TALLOC_FREE(hdr); - goto next_pkt; - } - - if (!(msg_state->rec = ctdb_pull_messaging_rec( - msg_state, msg->hdr.length, msg))) { - DEBUG(0, ("ctdbd_pull_messaging_rec failed\n")); - TALLOC_FREE(msg_state); - TALLOC_FREE(hdr); - goto next_pkt; - } - TALLOC_FREE(hdr); - - msg_state->msg_ctx = conn->msg_ctx; - - /* - * We're waiting for a call reply, but an async message has - * crossed. Defer dispatching to the toplevel event loop. - */ - evt = tevent_add_timer(messaging_tevent_context(conn->msg_ctx), - messaging_tevent_context(conn->msg_ctx), - timeval_zero(), - deferred_message_dispatch, - msg_state); - if (evt == NULL) { - DEBUG(0, ("event_add_timed failed\n")); - TALLOC_FREE(msg_state); - TALLOC_FREE(hdr); - goto next_pkt; - } - goto next_pkt; } @@ -626,11 +503,6 @@ NTSTATUS ctdbd_messaging_connection(TALLOC_CTX *mem_ctx, return status; } - status = register_with_ctdbd(conn, (uint64_t)getpid(), NULL, NULL); - if (!NT_STATUS_IS_OK(status)) { - goto fail; - } - status = register_with_ctdbd(conn, MSG_SRVID_SAMBA, NULL, NULL); if (!NT_STATUS_IS_OK(status)) { goto fail; @@ -668,7 +540,6 @@ static NTSTATUS ctdb_handle_message(struct messaging_context *msg_ctx, struct ctdb_req_header *hdr) { struct ctdb_req_message *msg; - struct messaging_rec *msg_rec; if (hdr->operation != CTDB_REQ_MESSAGE) { DEBUG(0, ("Received async msg of type %u, discarding\n", @@ -717,18 +588,6 @@ static NTSTATUS ctdb_handle_message(struct messaging_context *msg_ctx, ctdbd_msg_call_back(conn, msg); - if (!ctdb_is_our_srvid(conn, msg->srvid)) { - DEBUG(0,("Got unexpected message with srvid=%llu\n", - (unsigned long long)msg->srvid)); - return NT_STATUS_OK; - } - - msg_rec = ctdb_pull_messaging_rec(talloc_tos(), msg->hdr.length, msg); - if (msg_rec == NULL) { - DEBUG(10, ("ctdb_pull_messaging_rec failed\n")); - return NT_STATUS_NO_MEMORY; - } - messaging_dispatch_rec(conn->msg_ctx, msg_rec); return NT_STATUS_OK; } diff --git a/source3/lib/messages_ctdbd.c b/source3/lib/messages_ctdbd.c index 799780e6a14..430dc51dbff 100644 --- a/source3/lib/messages_ctdbd.c +++ b/source3/lib/messages_ctdbd.c @@ -128,6 +128,88 @@ static int messaging_ctdbd_destructor(struct messaging_ctdbd_context *ctx) return 0; } +static struct messaging_rec *ctdb_pull_messaging_rec( + TALLOC_CTX *mem_ctx, const struct ctdb_req_message *msg) +{ + struct messaging_rec *result; + DATA_BLOB blob; + enum ndr_err_code ndr_err; + size_t len = msg->hdr.length; + + if (len < offsetof(struct ctdb_req_message, data)) { + return NULL; + } + len -= offsetof(struct ctdb_req_message, data); + + if (len < msg->datalen) { + return NULL; + } + + result = talloc(mem_ctx, struct messaging_rec); + if (result == NULL) { + return NULL; + } + + blob = data_blob_const(msg->data, msg->datalen); + + ndr_err = ndr_pull_struct_blob_all( + &blob, result, result, + (ndr_pull_flags_fn_t)ndr_pull_messaging_rec); + + if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) { + DEBUG(0, ("ndr_pull_struct_blob failed: %s\n", + ndr_errstr(ndr_err))); + TALLOC_FREE(result); + return NULL; + } + + if (DEBUGLEVEL >= 11) { + DEBUG(11, ("ctdb_pull_messaging_rec:\n")); + NDR_PRINT_DEBUG(messaging_rec, result); + } + + return result; +} + +static void messaging_ctdb_recv(struct ctdb_req_message *msg, + void *private_data) +{ + struct messaging_context *msg_ctx = talloc_get_type_abort( + private_data, struct messaging_context); + struct server_id me = messaging_server_id(msg_ctx); + struct messaging_rec *rec; + NTSTATUS status; + struct iovec iov; + + rec = ctdb_pull_messaging_rec(msg_ctx, msg); + if (rec == NULL) { + DEBUG(10, ("%s: ctdb_pull_messaging_rec failed\n", __func__)); + return; + } + + if (!server_id_same_process(&me, &rec->dest)) { + struct server_id_buf id1, id2; + + DEBUG(10, ("%s: I'm %s, ignoring msg to %s\n", __func__, + server_id_str_buf(me, &id1), + server_id_str_buf(rec->dest, &id2))); + TALLOC_FREE(rec); + return; + } + + iov = (struct iovec) { .iov_base = rec->buf.data, + .iov_len = rec->buf.length }; + + status = messaging_send_iov_from(msg_ctx, rec->src, rec->dest, + rec->msg_type, &iov, 1, NULL, 0); + TALLOC_FREE(rec); + + if (!NT_STATUS_IS_OK(status)) { + DEBUG(10, ("%s: messaging_send_iov_from failed: %s\n", + __func__, nt_errstr(status))); + } +} + NTSTATUS messaging_ctdbd_init(struct messaging_context *msg_ctx, TALLOC_CTX *mem_ctx, struct messaging_backend **presult) @@ -165,6 +247,9 @@ NTSTATUS messaging_ctdbd_init(struct messaging_context *msg_ctx, return status; } + status = register_with_ctdbd(ctx->conn, getpid(), + messaging_ctdb_recv, msg_ctx); + global_ctdb_connection_pid = getpid(); global_ctdbd_connection = ctx->conn; talloc_set_destructor(ctx, messaging_ctdbd_destructor); |