From 08ff9e80dee211dc1c25e10c7584e98f2e766f5e Mon Sep 17 00:00:00 2001 From: Volker Lendecke Date: Sun, 8 Feb 2015 15:33:39 +0100 Subject: messaging4: Use messages_dgm This replaces the transport mechanism in source4 with calls to the messages_dgm code. It is supposed to enable "smbcontrol samba pool-usage" as an example without having to rewrite smbcontrol using the source4 based messaging subsystem. This moves the source3 based names.tdb (which is unused so far) to the lock directory, source4 does not have a cache directory. Signed-off-by: Volker Lendecke Reviewed-by: Stefan Metzmacher --- source3/lib/messages.c | 2 +- source4/lib/messaging/messaging.c | 442 +++++++++--------------------------- source4/lib/messaging/wscript_build | 2 +- 3 files changed, 108 insertions(+), 338 deletions(-) diff --git a/source3/lib/messages.c b/source3/lib/messages.c index 1f085e07c83..7df7cdb699d 100644 --- a/source3/lib/messages.c +++ b/source3/lib/messages.c @@ -343,7 +343,7 @@ struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, } ctx->names_db = server_id_db_init( - ctx, ctx->id, lp_cache_directory(), 0, + ctx, ctx->id, lp_lock_directory(), 0, TDB_INCOMPATIBLE_HASH|TDB_CLEAR_IF_FIRST); if (ctx->names_db == NULL) { DEBUG(10, ("%s: server_id_db_init failed\n", __func__)); diff --git a/source4/lib/messaging/messaging.c b/source4/lib/messaging/messaging.c index 0b4e109c765..cad2d64233d 100644 --- a/source4/lib/messaging/messaging.c +++ b/source4/lib/messaging/messaging.c @@ -33,6 +33,9 @@ #include "../lib/util/tevent_ntstatus.h" #include "lib/param/param.h" #include "lib/util/server_id_db.h" +#include "../source3/lib/messages_dgm.h" +#include "../source3/lib/messages_dgm_ref.h" +#include "../source3/lib/messages_util.h" #include /* change the message version with any incompatible changes in the protocol */ @@ -51,10 +54,10 @@ struct irpc_request { }; struct imessaging_context { + struct imessaging_context *prev, *next; struct server_id server_id; - struct socket_context *sock; - const char *base_path; - const char *path; + const char *sock_dir; + const char *lock_dir; struct dispatch_fn **dispatch; uint32_t num_types; struct idr_context *dispatch_tree; @@ -64,10 +67,7 @@ struct imessaging_context { struct idr_context *idr; struct server_id_db *names; struct timeval start_time; - struct tevent_timer *retry_te; - struct { - struct tevent_fd *fde; - } event; + void *msg_dgm_ref; }; /* we have a linked list of dispatch handlers for each msg_type that @@ -126,248 +126,20 @@ static NTSTATUS irpc_uptime(struct irpc_message *msg, return NT_STATUS_OK; } -/* - return the path to a messaging socket -*/ -static char *imessaging_path(struct imessaging_context *msg, struct server_id server_id) -{ - struct server_id_buf buf; - - return talloc_asprintf(msg, "%s/msg.%s", msg->base_path, - server_id_str_buf(server_id, &buf)); -} - -/* - dispatch a fully received message - - note that this deliberately can match more than one message handler - per message. That allows a single messasging context to register - (for example) a debug handler for more than one piece of code -*/ -static void imessaging_dispatch(struct imessaging_context *msg, struct imessaging_rec *rec) +static struct dispatch_fn *imessaging_find_dispatch( + struct imessaging_context *msg, uint32_t msg_type) { - struct dispatch_fn *d, *next; - /* temporary IDs use an idtree, the rest use a array of pointers */ - if (rec->header->msg_type >= MSG_TMP_BASE) { - d = (struct dispatch_fn *)idr_find(msg->dispatch_tree, - rec->header->msg_type); - } else if (rec->header->msg_type < msg->num_types) { - d = msg->dispatch[rec->header->msg_type]; - } else { - d = NULL; - } - - for (; d; d = next) { - DATA_BLOB data; - next = d->next; - data.data = rec->packet.data + sizeof(*rec->header); - data.length = rec->header->length; - d->fn(msg, d->private_data, d->msg_type, rec->header->from, &data); - } - rec->header->length = 0; -} - -/* - handler for messages that arrive from other nodes in the cluster -*/ -static void cluster_message_handler(struct imessaging_context *msg, DATA_BLOB packet) -{ - struct imessaging_rec *rec; - - rec = talloc(msg, struct imessaging_rec); - if (rec == NULL) { - smb_panic("Unable to allocate imessaging_rec"); - } - - rec->msg = msg; - rec->path = msg->path; - rec->header = (struct imessaging_header *)packet.data; - rec->packet = packet; - rec->retries = 0; - - if (packet.length != sizeof(*rec->header) + rec->header->length) { - DEBUG(0,("messaging: bad message header size %d should be %d\n", - rec->header->length, (int)(packet.length - sizeof(*rec->header)))); - talloc_free(rec); - return; - } - - imessaging_dispatch(msg, rec); - talloc_free(rec); -} - - - -/* - try to send the message -*/ -static NTSTATUS try_send(struct imessaging_rec *rec) -{ - struct imessaging_context *msg = rec->msg; - size_t nsent; - void *priv; - NTSTATUS status; - struct socket_address *path; - - /* rec->path is the path of the *other* socket, where we want - * this to end up */ - path = socket_address_from_strings(msg, msg->sock->backend_name, - rec->path, 0); - if (!path) { - return NT_STATUS_NO_MEMORY; - } - - /* we send with privileges so messages work from any context */ - priv = root_privileges(); - status = socket_sendto(msg->sock, &rec->packet, &nsent, path); - talloc_free(path); - talloc_free(priv); - - return status; -} - -/* - retry backed off messages -*/ -static void msg_retry_timer(struct tevent_context *ev, struct tevent_timer *te, - struct timeval t, void *private_data) -{ - struct imessaging_context *msg = talloc_get_type(private_data, - struct imessaging_context); - msg->retry_te = NULL; - - /* put the messages back on the main queue */ - while (msg->retry_queue) { - struct imessaging_rec *rec = msg->retry_queue; - DLIST_REMOVE(msg->retry_queue, rec); - DLIST_ADD_END(msg->pending, rec, struct imessaging_rec *); - } - - TEVENT_FD_WRITEABLE(msg->event.fde); -} - -/* - handle a socket write event -*/ -static void imessaging_send_handler(struct imessaging_context *msg, struct tevent_context *ev) -{ - while (msg->pending) { - struct imessaging_rec *rec = msg->pending; - NTSTATUS status; - status = try_send(rec); - if (NT_STATUS_EQUAL(status, STATUS_MORE_ENTRIES)) { - rec->retries++; - if (rec->retries > 3) { - /* we're getting continuous write errors - - backoff this record */ - DLIST_REMOVE(msg->pending, rec); - DLIST_ADD_END(msg->retry_queue, rec, - struct imessaging_rec *); - if (msg->retry_te == NULL) { - msg->retry_te = - tevent_add_timer(ev, msg, - timeval_current_ofs(1, 0), - msg_retry_timer, msg); - } - } - break; - } - rec->retries = 0; - if (!NT_STATUS_IS_OK(status)) { - TALLOC_CTX *tmp_ctx = talloc_new(msg); - DEBUG(1,("messaging: Lost message from %s to %s of type %u - %s\n", - server_id_str(tmp_ctx, &rec->header->from), - server_id_str(tmp_ctx, &rec->header->to), - rec->header->msg_type, - nt_errstr(status))); - talloc_free(tmp_ctx); - } - DLIST_REMOVE(msg->pending, rec); - talloc_free(rec); - } - if (msg->pending == NULL) { - TEVENT_FD_NOT_WRITEABLE(msg->event.fde); - } -} - -/* - handle a new incoming packet -*/ -static void imessaging_recv_handler(struct imessaging_context *msg, struct tevent_context *ev) -{ - struct imessaging_rec *rec; - NTSTATUS status; - DATA_BLOB packet; - size_t msize; - - /* see how many bytes are in the next packet */ - status = socket_pending(msg->sock, &msize); - if (!NT_STATUS_IS_OK(status)) { - DEBUG(0,("socket_pending failed in messaging - %s\n", - nt_errstr(status))); - return; - } - - packet = data_blob_talloc(msg, NULL, msize); - if (packet.data == NULL) { - /* assume this is temporary and retry */ - return; - } - - status = socket_recv(msg->sock, packet.data, msize, &msize); - if (!NT_STATUS_IS_OK(status)) { - data_blob_free(&packet); - return; - } - - if (msize < sizeof(*rec->header)) { - DEBUG(0,("messaging: bad message of size %d\n", (int)msize)); - data_blob_free(&packet); - return; - } - - rec = talloc(msg, struct imessaging_rec); - if (rec == NULL) { - smb_panic("Unable to allocate imessaging_rec"); - } - - talloc_steal(rec, packet.data); - rec->msg = msg; - rec->path = msg->path; - rec->header = (struct imessaging_header *)packet.data; - rec->packet = packet; - rec->retries = 0; - - if (msize != sizeof(*rec->header) + rec->header->length) { - DEBUG(0,("messaging: bad message header size %d should be %d\n", - rec->header->length, (int)(msize - sizeof(*rec->header)))); - talloc_free(rec); - return; - } - - imessaging_dispatch(msg, rec); - talloc_free(rec); -} - - -/* - handle a socket event -*/ -static void imessaging_handler(struct tevent_context *ev, struct tevent_fd *fde, - uint16_t flags, void *private_data) -{ - struct imessaging_context *msg = talloc_get_type(private_data, - struct imessaging_context); - if (flags & TEVENT_FD_WRITE) { - imessaging_send_handler(msg, ev); + if (msg_type >= MSG_TMP_BASE) { + return (struct dispatch_fn *)idr_find(msg->dispatch_tree, + msg_type); } - if (flags & TEVENT_FD_READ) { - imessaging_recv_handler(msg, ev); + if (msg_type < msg->num_types) { + return msg->dispatch[msg_type]; } + return NULL; } - /* Register a dispatch function for a particular message type. */ @@ -458,64 +230,40 @@ void imessaging_deregister(struct imessaging_context *msg, uint32_t msg_type, vo NTSTATUS imessaging_send(struct imessaging_context *msg, struct server_id server, uint32_t msg_type, const DATA_BLOB *data) { - struct imessaging_rec *rec; - NTSTATUS status; - size_t dlength = data?data->length:0; + uint8_t hdr[MESSAGE_HDR_LENGTH]; + struct iovec iov[2]; + int num_iov, ret; + pid_t pid; + void *priv; - rec = talloc(msg, struct imessaging_rec); - if (rec == NULL) { - return NT_STATUS_NO_MEMORY; + if (!cluster_node_equal(&msg->server_id, &server)) { + /* No cluster in source4... */ + return NT_STATUS_OK; } - rec->packet = data_blob_talloc(rec, NULL, sizeof(*rec->header) + dlength); - if (rec->packet.data == NULL) { - talloc_free(rec); - return NT_STATUS_NO_MEMORY; - } + message_hdr_put(hdr, msg_type, msg->server_id, server); - rec->retries = 0; - rec->msg = msg; - rec->header = (struct imessaging_header *)rec->packet.data; - /* zero padding */ - ZERO_STRUCTP(rec->header); - rec->header->version = IMESSAGING_VERSION; - rec->header->msg_type = msg_type; - rec->header->from = msg->server_id; - rec->header->to = server; - rec->header->length = dlength; - if (dlength != 0) { - memcpy(rec->packet.data + sizeof(*rec->header), - data->data, dlength); - } + iov[0] = (struct iovec) { .iov_base = &hdr, .iov_len = sizeof(hdr) }; + num_iov = 1; - if (!cluster_node_equal(&msg->server_id, &server)) { - /* the destination is on another node - dispatch via - the cluster layer */ - status = cluster_message_send(server, &rec->packet); - talloc_free(rec); - return status; + if (data != NULL) { + iov[1] = (struct iovec) { .iov_base = data->data, + .iov_len = data->length }; + num_iov += 1; } - rec->path = imessaging_path(msg, server); - talloc_steal(rec, rec->path); - - if (msg->pending != NULL) { - status = STATUS_MORE_ENTRIES; - } else { - status = try_send(rec); + pid = server.pid; + if (pid == 0) { + pid = getpid(); } - if (NT_STATUS_EQUAL(status, STATUS_MORE_ENTRIES)) { - if (msg->pending == NULL) { - TEVENT_FD_WRITEABLE(msg->event.fde); - } - DLIST_ADD_END(msg->pending, rec, struct imessaging_rec *); - return NT_STATUS_OK; + priv = root_privileges(); + ret = messaging_dgm_send(pid, iov, num_iov, NULL, 0); + TALLOC_FREE(priv); + if (ret != 0) { + return map_nt_error_from_unix_common(ret); } - - talloc_free(rec); - - return status; + return NT_STATUS_OK; } /* @@ -541,12 +289,13 @@ int imessaging_cleanup(struct imessaging_context *msg) if (!msg) { return 0; } - - DEBUG(5,("imessaging: cleaning up %s\n", msg->path)); - unlink(msg->path); return 0; } +static void imessaging_dgm_recv(const uint8_t *buf, size_t buf_len, + int *fds, size_t num_fds, + void *private_data); + /* create the listening socket and setup the dispatcher @@ -562,9 +311,8 @@ struct imessaging_context *imessaging_init(TALLOC_CTX *mem_ctx, bool auto_remove) { struct imessaging_context *msg; - NTSTATUS status; - struct socket_address *path; bool ok; + int ret; if (ev == NULL) { return NULL; @@ -575,26 +323,31 @@ struct imessaging_context *imessaging_init(TALLOC_CTX *mem_ctx, return NULL; } - /* setup a handler for messages from other cluster nodes, if appropriate */ - status = cluster_message_init(msg, server_id, cluster_message_handler); - if (!NT_STATUS_IS_OK(status)) { - goto fail; - } - /* create the messaging directory if needed */ - msg->base_path = lpcfg_imessaging_path(msg, lp_ctx); - if (msg->base_path == NULL) { + msg->sock_dir = lpcfg_private_path(msg, lp_ctx, "sock"); + if (msg->sock_dir == NULL) { + goto fail; + } + ok = directory_create_or_exist_strict(msg->sock_dir, geteuid(), 0700); + if (!ok) { goto fail; } - ok = directory_create_or_exist_strict(msg->base_path, geteuid(), 0700); + msg->lock_dir = lpcfg_lock_path(msg, lp_ctx, "msg"); + if (msg->lock_dir == NULL) { + goto fail; + } + ok = directory_create_or_exist_strict(msg->lock_dir, geteuid(), 0755); if (!ok) { goto fail; } - msg->path = imessaging_path(msg, server_id); - if (msg->path == NULL) { + msg->msg_dgm_ref = messaging_dgm_ref( + msg, ev, server_id.unique_id, msg->sock_dir, msg->lock_dir, + imessaging_dgm_recv, msg, &ret); + + if (msg->msg_dgm_ref == NULL) { goto fail; } @@ -612,41 +365,13 @@ struct imessaging_context *imessaging_init(TALLOC_CTX *mem_ctx, msg->start_time = timeval_current(); msg->names = server_id_db_init( - msg, server_id, msg->base_path, 0, + msg, server_id, msg->lock_dir, 0, TDB_INCOMPATIBLE_HASH|TDB_CLEAR_IF_FIRST| lpcfg_tdb_flags(lp_ctx, 0)); if (msg->names == NULL) { goto fail; } - status = socket_create("unix", SOCKET_TYPE_DGRAM, &msg->sock, 0); - if (!NT_STATUS_IS_OK(status)) { - goto fail; - } - - /* by stealing here we ensure that the socket is cleaned up (and even - deleted) on exit */ - talloc_steal(msg, msg->sock); - - path = socket_address_from_strings(msg, msg->sock->backend_name, - msg->path, 0); - if (!path) { - goto fail; - } - - status = socket_listen(msg->sock, path, 50, 0); - if (!NT_STATUS_IS_OK(status)) { - DEBUG(0,("Unable to setup messaging listener for '%s':%s\n", msg->path, nt_errstr(status))); - goto fail; - } - - /* it needs to be non blocking for sends */ - set_blocking(socket_get_fd(msg->sock), false); - - msg->event.fde = tevent_add_fd(ev, msg, socket_get_fd(msg->sock), - TEVENT_FD_READ, imessaging_handler, msg); - tevent_fd_set_auto_close(msg->event.fde); - if (auto_remove) { talloc_set_destructor(msg, imessaging_cleanup); } @@ -661,6 +386,51 @@ fail: return NULL; } +static void imessaging_dgm_recv(const uint8_t *buf, size_t buf_len, + int *fds, size_t num_fds, + void *private_data) +{ + struct imessaging_context *msg = talloc_get_type_abort( + private_data, struct imessaging_context); + uint32_t msg_type; + struct server_id src, dst; + struct server_id_buf srcbuf, dstbuf; + DATA_BLOB data; + + if (buf_len < MESSAGE_HDR_LENGTH) { + /* Invalid message, ignore */ + return; + } + + message_hdr_get(&msg_type, &src, &dst, buf); + + data.data = discard_const_p(uint8_t, buf + MESSAGE_HDR_LENGTH); + data.length = buf_len - MESSAGE_HDR_LENGTH; + + if ((cluster_id_equal(&dst, &msg->server_id)) || + ((dst.task_id == 0) && (msg->server_id.pid == 0))) { + struct dispatch_fn *d, *next; + + DEBUG(10, ("%s: dst %s matches my id: %s, type=0x%x\n", + __func__, + server_id_str_buf(dst, &dstbuf), + server_id_str_buf(msg->server_id, &srcbuf), + (unsigned)msg_type)); + + d = imessaging_find_dispatch(msg, msg_type); + + for (; d; d = next) { + next = d->next; + d->fn(msg, d->private_data, d->msg_type, src, &data); + } + } else { + DEBUG(10, ("%s: Ignoring type=0x%x dst %s, I am %s, \n", + __func__, (unsigned)msg_type, + server_id_str_buf(dst, &dstbuf), + server_id_str_buf(msg->server_id, &srcbuf))); + } +} + /* A hack, for the short term until we get 'client only' messaging in place */ diff --git a/source4/lib/messaging/wscript_build b/source4/lib/messaging/wscript_build index 48c490e58dc..efe1f79afd6 100644 --- a/source4/lib/messaging/wscript_build +++ b/source4/lib/messaging/wscript_build @@ -3,7 +3,7 @@ bld.SAMBA_LIBRARY('MESSAGING', source='messaging.c', - public_deps='samba-util NDR_IRPC UNIX_PRIVS cluster ndr samba_socket dcerpc server_id_db', + public_deps='samba-util NDR_IRPC UNIX_PRIVS cluster ndr dcerpc messages_util server_id_db', private_library=True ) -- cgit v1.2.1