summaryrefslogtreecommitdiff
path: root/source3
diff options
context:
space:
mode:
authorVolker Lendecke <vl@samba.org>2015-05-19 16:55:32 +0200
committerVolker Lendecke <vl@samba.org>2015-05-28 11:13:09 +0200
commita37398b9de835eaf95237cf5c8ade41d0800a37c (patch)
tree96e3ad52e6ff2dcbb09ba75e21f27dbbd6422d3d /source3
parent24eb3659e3ee5a262a80aea1b36a8e5ef3c7be99 (diff)
downloadsamba-a37398b9de835eaf95237cf5c8ade41d0800a37c.tar.gz
ctdbd_conn: Move message handling out of ctdbd_conn.c
This also removes the deferred message handling. It's no longer required, because the messaging_send_iov_from always goes through the kernel which takes at least one round through tevent. Signed-off-by: Volker Lendecke <vl@samba.org> Reviewed-by: Stefan Metzmacher <metze@samba.org>
Diffstat (limited to 'source3')
-rw-r--r--source3/lib/ctdbd_conn.c141
-rw-r--r--source3/lib/messages_ctdbd.c85
2 files changed, 85 insertions, 141 deletions
diff --git a/source3/lib/ctdbd_conn.c b/source3/lib/ctdbd_conn.c
index a0dbd32434f..4e030184b22 100644
--- a/source3/lib/ctdbd_conn.c
+++ b/source3/lib/ctdbd_conn.c
@@ -130,20 +130,6 @@ NTSTATUS register_with_ctdbd(struct ctdbd_connection *conn, uint64_t srvid,
return NT_STATUS_OK;
}
-static bool ctdb_is_our_srvid(struct ctdbd_connection *conn, uint64_t srvid)
-{
- size_t i, num_callbacks;
-
- num_callbacks = talloc_array_length(conn->callbacks);
-
- for (i=0; i<num_callbacks; i++) {
- if (srvid == conn->callbacks[i].srvid) {
- return true;
- }
- }
- return false;
-}
-
static void ctdbd_msg_call_back(struct ctdbd_connection *conn,
struct ctdb_req_message *msg)
{
@@ -292,78 +278,6 @@ static int ctdbd_connect(int *pfd)
return 0;
}
-/*
- * State necessary to defer an incoming message while we are waiting for a
- * ctdb reply.
- */
-
-struct deferred_msg_state {
- struct messaging_context *msg_ctx;
- struct messaging_rec *rec;
-};
-
-/*
- * Timed event handler for the deferred message
- */
-
-static void deferred_message_dispatch(struct tevent_context *event_ctx,
- struct tevent_timer *te,
- struct timeval now,
- void *private_data)
-{
- struct deferred_msg_state *state = talloc_get_type_abort(
- private_data, struct deferred_msg_state);
-
- messaging_dispatch_rec(state->msg_ctx, state->rec);
- TALLOC_FREE(state);
- TALLOC_FREE(te);
-}
-
-/*
- * Fetch a messaging_rec from an incoming ctdb style message
- */
-
-static struct messaging_rec *ctdb_pull_messaging_rec(TALLOC_CTX *mem_ctx,
- size_t overall_length,
- struct ctdb_req_message *msg)
-{
- struct messaging_rec *result;
- DATA_BLOB blob;
- enum ndr_err_code ndr_err;
-
- if ((overall_length < offsetof(struct ctdb_req_message, data))
- || (overall_length
- < offsetof(struct ctdb_req_message, data) + msg->datalen)) {
-
- cluster_fatal("got invalid msg length");
- }
-
- if (!(result = talloc(mem_ctx, struct messaging_rec))) {
- DEBUG(0, ("talloc failed\n"));
- return NULL;
- }
-
- blob = data_blob_const(msg->data, msg->datalen);
-
- ndr_err = ndr_pull_struct_blob(
- &blob, result, result,
- (ndr_pull_flags_fn_t)ndr_pull_messaging_rec);
-
- if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
- DEBUG(0, ("ndr_pull_struct_blob failed: %s\n",
- ndr_errstr(ndr_err)));
- TALLOC_FREE(result);
- return NULL;
- }
-
- if (DEBUGLEVEL >= 11) {
- DEBUG(11, ("ctdb_pull_messaging_rec:\n"));
- NDR_PRINT_DEBUG(messaging_rec, result);
- }
-
- return result;
-}
-
static NTSTATUS ctdb_read_packet(int fd, TALLOC_CTX *mem_ctx,
struct ctdb_req_header **result)
{
@@ -447,8 +361,6 @@ static NTSTATUS ctdb_read_req(struct ctdbd_connection *conn, uint32_t reqid,
ctdb_packet_dump(hdr);
if (hdr->operation == CTDB_REQ_MESSAGE) {
- struct tevent_timer *evt;
- struct deferred_msg_state *msg_state;
struct ctdb_req_message *msg = (struct ctdb_req_message *)hdr;
if (conn->msg_ctx == NULL) {
@@ -497,42 +409,7 @@ static NTSTATUS ctdb_read_req(struct ctdbd_connection *conn, uint32_t reqid,
}
ctdbd_msg_call_back(conn, msg);
-
- msg_state = talloc(NULL, struct deferred_msg_state);
- if (msg_state == NULL) {
- DEBUG(0, ("talloc failed\n"));
- TALLOC_FREE(hdr);
- goto next_pkt;
- }
-
- if (!(msg_state->rec = ctdb_pull_messaging_rec(
- msg_state, msg->hdr.length, msg))) {
- DEBUG(0, ("ctdbd_pull_messaging_rec failed\n"));
- TALLOC_FREE(msg_state);
- TALLOC_FREE(hdr);
- goto next_pkt;
- }
-
TALLOC_FREE(hdr);
-
- msg_state->msg_ctx = conn->msg_ctx;
-
- /*
- * We're waiting for a call reply, but an async message has
- * crossed. Defer dispatching to the toplevel event loop.
- */
- evt = tevent_add_timer(messaging_tevent_context(conn->msg_ctx),
- messaging_tevent_context(conn->msg_ctx),
- timeval_zero(),
- deferred_message_dispatch,
- msg_state);
- if (evt == NULL) {
- DEBUG(0, ("event_add_timed failed\n"));
- TALLOC_FREE(msg_state);
- TALLOC_FREE(hdr);
- goto next_pkt;
- }
-
goto next_pkt;
}
@@ -626,11 +503,6 @@ NTSTATUS ctdbd_messaging_connection(TALLOC_CTX *mem_ctx,
return status;
}
- status = register_with_ctdbd(conn, (uint64_t)getpid(), NULL, NULL);
- if (!NT_STATUS_IS_OK(status)) {
- goto fail;
- }
-
status = register_with_ctdbd(conn, MSG_SRVID_SAMBA, NULL, NULL);
if (!NT_STATUS_IS_OK(status)) {
goto fail;
@@ -668,7 +540,6 @@ static NTSTATUS ctdb_handle_message(struct messaging_context *msg_ctx,
struct ctdb_req_header *hdr)
{
struct ctdb_req_message *msg;
- struct messaging_rec *msg_rec;
if (hdr->operation != CTDB_REQ_MESSAGE) {
DEBUG(0, ("Received async msg of type %u, discarding\n",
@@ -717,18 +588,6 @@ static NTSTATUS ctdb_handle_message(struct messaging_context *msg_ctx,
ctdbd_msg_call_back(conn, msg);
- if (!ctdb_is_our_srvid(conn, msg->srvid)) {
- DEBUG(0,("Got unexpected message with srvid=%llu\n",
- (unsigned long long)msg->srvid));
- return NT_STATUS_OK;
- }
-
- msg_rec = ctdb_pull_messaging_rec(talloc_tos(), msg->hdr.length, msg);
- if (msg_rec == NULL) {
- DEBUG(10, ("ctdb_pull_messaging_rec failed\n"));
- return NT_STATUS_NO_MEMORY;
- }
- messaging_dispatch_rec(conn->msg_ctx, msg_rec);
return NT_STATUS_OK;
}
diff --git a/source3/lib/messages_ctdbd.c b/source3/lib/messages_ctdbd.c
index 799780e6a14..430dc51dbff 100644
--- a/source3/lib/messages_ctdbd.c
+++ b/source3/lib/messages_ctdbd.c
@@ -128,6 +128,88 @@ static int messaging_ctdbd_destructor(struct messaging_ctdbd_context *ctx)
return 0;
}
+static struct messaging_rec *ctdb_pull_messaging_rec(
+ TALLOC_CTX *mem_ctx, const struct ctdb_req_message *msg)
+{
+ struct messaging_rec *result;
+ DATA_BLOB blob;
+ enum ndr_err_code ndr_err;
+ size_t len = msg->hdr.length;
+
+ if (len < offsetof(struct ctdb_req_message, data)) {
+ return NULL;
+ }
+ len -= offsetof(struct ctdb_req_message, data);
+
+ if (len < msg->datalen) {
+ return NULL;
+ }
+
+ result = talloc(mem_ctx, struct messaging_rec);
+ if (result == NULL) {
+ return NULL;
+ }
+
+ blob = data_blob_const(msg->data, msg->datalen);
+
+ ndr_err = ndr_pull_struct_blob_all(
+ &blob, result, result,
+ (ndr_pull_flags_fn_t)ndr_pull_messaging_rec);
+
+ if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
+ DEBUG(0, ("ndr_pull_struct_blob failed: %s\n",
+ ndr_errstr(ndr_err)));
+ TALLOC_FREE(result);
+ return NULL;
+ }
+
+ if (DEBUGLEVEL >= 11) {
+ DEBUG(11, ("ctdb_pull_messaging_rec:\n"));
+ NDR_PRINT_DEBUG(messaging_rec, result);
+ }
+
+ return result;
+}
+
+static void messaging_ctdb_recv(struct ctdb_req_message *msg,
+ void *private_data)
+{
+ struct messaging_context *msg_ctx = talloc_get_type_abort(
+ private_data, struct messaging_context);
+ struct server_id me = messaging_server_id(msg_ctx);
+ struct messaging_rec *rec;
+ NTSTATUS status;
+ struct iovec iov;
+
+ rec = ctdb_pull_messaging_rec(msg_ctx, msg);
+ if (rec == NULL) {
+ DEBUG(10, ("%s: ctdb_pull_messaging_rec failed\n", __func__));
+ return;
+ }
+
+ if (!server_id_same_process(&me, &rec->dest)) {
+ struct server_id_buf id1, id2;
+
+ DEBUG(10, ("%s: I'm %s, ignoring msg to %s\n", __func__,
+ server_id_str_buf(me, &id1),
+ server_id_str_buf(rec->dest, &id2)));
+ TALLOC_FREE(rec);
+ return;
+ }
+
+ iov = (struct iovec) { .iov_base = rec->buf.data,
+ .iov_len = rec->buf.length };
+
+ status = messaging_send_iov_from(msg_ctx, rec->src, rec->dest,
+ rec->msg_type, &iov, 1, NULL, 0);
+ TALLOC_FREE(rec);
+
+ if (!NT_STATUS_IS_OK(status)) {
+ DEBUG(10, ("%s: messaging_send_iov_from failed: %s\n",
+ __func__, nt_errstr(status)));
+ }
+}
+
NTSTATUS messaging_ctdbd_init(struct messaging_context *msg_ctx,
TALLOC_CTX *mem_ctx,
struct messaging_backend **presult)
@@ -165,6 +247,9 @@ NTSTATUS messaging_ctdbd_init(struct messaging_context *msg_ctx,
return status;
}
+ status = register_with_ctdbd(ctx->conn, getpid(),
+ messaging_ctdb_recv, msg_ctx);
+
global_ctdb_connection_pid = getpid();
global_ctdbd_connection = ctx->conn;
talloc_set_destructor(ctx, messaging_ctdbd_destructor);