summaryrefslogtreecommitdiff
path: root/ctdb/tcp
diff options
context:
space:
mode:
authorAndrew Tridgell <tridge@samba.org>2007-04-10 19:33:21 +1000
committerAndrew Tridgell <tridge@samba.org>2007-04-10 19:33:21 +1000
commitf1e0174e8314143408c1aa6637bafafdd445c9f9 (patch)
tree075799dcf41acce4a291a5771bf23236a75571d6 /ctdb/tcp
parent82b712d80f81bb85f04b8ac636442bd0512853a7 (diff)
downloadsamba-f1e0174e8314143408c1aa6637bafafdd445c9f9.tar.gz
made all sockets handle partial IO
abstract IO via ctdb_queue_*() functions (This used to be ctdb commit 636ae76f4632b29231db87be32c9114f58b37840)
Diffstat (limited to 'ctdb/tcp')
-rw-r--r--ctdb/tcp/ctdb_tcp.h20
-rw-r--r--ctdb/tcp/tcp_connect.c35
-rw-r--r--ctdb/tcp/tcp_io.c136
3 files changed, 38 insertions, 153 deletions
diff --git a/ctdb/tcp/ctdb_tcp.h b/ctdb/tcp/ctdb_tcp.h
index 5b6cd299b92..4fa496cd5ac 100644
--- a/ctdb/tcp/ctdb_tcp.h
+++ b/ctdb/tcp/ctdb_tcp.h
@@ -30,17 +30,7 @@ struct ctdb_tcp {
struct ctdb_incoming {
struct ctdb_context *ctdb;
int fd;
- struct ctdb_partial partial;
-};
-
-/*
- outgoing packet structure - only allocated when we can't write immediately
- to the socket
-*/
-struct ctdb_tcp_packet {
- struct ctdb_tcp_packet *next, *prev;
- uint8_t *data;
- uint32_t length;
+ struct ctdb_queue *queue;
};
/*
@@ -48,19 +38,15 @@ struct ctdb_tcp_packet {
*/
struct ctdb_tcp_node {
int fd;
- struct fd_event *fde;
- struct ctdb_tcp_packet *queue;
+ struct ctdb_queue *queue;
};
/* prototypes internal to tcp transport */
-void ctdb_tcp_node_write(struct event_context *ev, struct fd_event *fde,
- uint16_t flags, void *private);
-void ctdb_tcp_incoming_read(struct event_context *ev, struct fd_event *fde,
- uint16_t flags, void *private);
int ctdb_tcp_queue_pkt(struct ctdb_node *node, uint8_t *data, uint32_t length);
int ctdb_tcp_listen(struct ctdb_context *ctdb);
void ctdb_tcp_node_connect(struct event_context *ev, struct timed_event *te,
struct timeval t, void *private);
+void ctdb_tcp_read_cb(uint8_t *data, size_t cnt, void *args);
#define CTDB_TCP_ALIGNMENT 8
diff --git a/ctdb/tcp/tcp_connect.c b/ctdb/tcp/tcp_connect.c
index 85fffc2f703..bccc8a63aa0 100644
--- a/ctdb/tcp/tcp_connect.c
+++ b/ctdb/tcp/tcp_connect.c
@@ -35,6 +35,24 @@ static void set_nonblocking(int fd)
/*
+ called when a complete packet has come in - should not happen on this socket
+ */
+void ctdb_tcp_tnode_cb(uint8_t *data, size_t cnt, void *private)
+{
+ struct ctdb_node *node = talloc_get_type(private, struct ctdb_node);
+ struct ctdb_tcp_node *tnode = talloc_get_type(node->private,
+ struct ctdb_tcp_node);
+
+ /* start a new connect cycle to try to re-establish the
+ link */
+ close(tnode->fd);
+ ctdb_queue_set_fd(tnode->queue, -1);
+ tnode->fd = -1;
+ event_add_timed(node->ctdb->ev, node, timeval_zero(),
+ ctdb_tcp_node_connect, node);
+}
+
+/*
called when socket becomes writeable on connect
*/
static void ctdb_node_connect_write(struct event_context *ev, struct fd_event *fde,
@@ -59,17 +77,14 @@ static void ctdb_node_connect_write(struct event_context *ev, struct fd_event *f
}
talloc_free(fde);
- tnode->fde = event_add_fd(node->ctdb->ev, node, tnode->fd, EVENT_FD_READ,
- ctdb_tcp_node_write, node);
+
+ setsockopt(tnode->fd,IPPROTO_TCP,TCP_NODELAY,(char *)&one,sizeof(one));
+
+ tnode->queue = ctdb_queue_setup(node->ctdb, node, tnode->fd, CTDB_TCP_ALIGNMENT,
+ ctdb_tcp_tnode_cb, node);
/* tell the ctdb layer we are connected */
node->ctdb->upcalls->node_connected(node);
-
- setsockopt(tnode->fd,IPPROTO_TCP,TCP_NODELAY,(char *)&one,sizeof(one));
-
- if (tnode->queue) {
- EVENT_FD_WRITEABLE(tnode->fde);
- }
}
@@ -177,8 +192,8 @@ static void ctdb_listen_event(struct event_context *ev, struct fd_event *fde,
set_nonblocking(in->fd);
- event_add_fd(ctdb->ev, in, in->fd, EVENT_FD_READ,
- ctdb_tcp_incoming_read, in);
+ in->queue = ctdb_queue_setup(ctdb, in, in->fd, CTDB_TCP_ALIGNMENT,
+ ctdb_tcp_read_cb, in);
talloc_set_destructor(in, ctdb_incoming_destructor);
}
diff --git a/ctdb/tcp/tcp_io.c b/ctdb/tcp/tcp_io.c
index 22771669869..8ec0a1e538c 100644
--- a/ctdb/tcp/tcp_io.c
+++ b/ctdb/tcp/tcp_io.c
@@ -29,81 +29,19 @@
/*
- called when we fail to send a message to a node
-*/
-static void ctdb_tcp_node_dead(struct event_context *ev, struct timed_event *te,
- struct timeval t, void *private)
+ called when a complete packet has come in
+ */
+void ctdb_tcp_read_cb(uint8_t *data, size_t cnt, void *args)
{
- struct ctdb_node *node = talloc_get_type(private, struct ctdb_node);
- struct ctdb_tcp_node *tnode = talloc_get_type(node->private,
- struct ctdb_tcp_node);
-
- /* start a new connect cycle to try to re-establish the
- link */
- talloc_free(tnode->fde);
- close(tnode->fd);
- tnode->fd = -1;
- event_add_timed(node->ctdb->ev, node, timeval_zero(),
- ctdb_tcp_node_connect, node);
-}
+ struct ctdb_incoming *in = talloc_get_type(args, struct ctdb_incoming);
+ struct ctdb_req_header *hdr;
-/*
- called when socket becomes readable
-*/
-void ctdb_tcp_node_write(struct event_context *ev, struct fd_event *fde,
- uint16_t flags, void *private)
-{
- struct ctdb_node *node = talloc_get_type(private, struct ctdb_node);
- struct ctdb_tcp_node *tnode = talloc_get_type(node->private,
- struct ctdb_tcp_node);
- if (flags & EVENT_FD_READ) {
- /* getting a read event on this fd in the current tcp model is
- always an error, as we have separate read and write
- sockets. In future we may combine them, but for now it must
- mean that the socket is dead, so we try to reconnect */
- node->ctdb->upcalls->node_dead(node);
- talloc_free(tnode->fde);
- close(tnode->fd);
- tnode->fd = -1;
- event_add_timed(node->ctdb->ev, node, timeval_zero(),
- ctdb_tcp_node_connect, node);
+ if (data == NULL) {
+ /* incoming socket has died */
+ talloc_free(in);
return;
}
- while (tnode->queue) {
- struct ctdb_tcp_packet *pkt = tnode->queue;
- ssize_t n;
-
- n = write(tnode->fd, pkt->data, pkt->length);
-
- if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
- event_add_timed(node->ctdb->ev, node, timeval_zero(),
- ctdb_tcp_node_dead, node);
- EVENT_FD_NOT_WRITEABLE(tnode->fde);
- return;
- }
- if (n <= 0) return;
-
- if (n != pkt->length) {
- pkt->length -= n;
- pkt->data += n;
- return;
- }
-
- DLIST_REMOVE(tnode->queue, pkt);
- talloc_free(pkt);
- }
-
- EVENT_FD_NOT_WRITEABLE(tnode->fde);
-}
-
-
-
-static void tcp_read_cb(uint8_t *data, int cnt, void *args)
-{
- struct ctdb_incoming *in = talloc_get_type(args, struct ctdb_incoming);
- struct ctdb_req_header *hdr;
-
if (cnt < sizeof(*hdr)) {
ctdb_set_error(in->ctdb, "Bad packet length %d\n", cnt);
return;
@@ -131,65 +69,11 @@ static void tcp_read_cb(uint8_t *data, int cnt, void *args)
}
/*
- called when an incoming connection is readable
-*/
-void ctdb_tcp_incoming_read(struct event_context *ev, struct fd_event *fde,
- uint16_t flags, void *private)
-{
- struct ctdb_incoming *in = talloc_get_type(private, struct ctdb_incoming);
-
- ctdb_read_pdu(in->fd, in, &in->partial, tcp_read_cb, in);
-}
-
-/*
queue a packet for sending
*/
int ctdb_tcp_queue_pkt(struct ctdb_node *node, uint8_t *data, uint32_t length)
{
- struct ctdb_tcp_node *tnode = talloc_get_type(node->private,
+ struct ctdb_tcp_node *tnode = talloc_get_type(node->private,
struct ctdb_tcp_node);
- struct ctdb_tcp_packet *pkt;
- uint32_t length2;
-
- /* enforce the length and alignment rules from the tcp packet allocator */
- length2 = (length+(CTDB_TCP_ALIGNMENT-1)) & ~(CTDB_TCP_ALIGNMENT-1);
- *(uint32_t *)data = length2;
-
- if (length2 != length) {
- memset(data+length, 0, length2-length);
- }
-
- /* if the queue is empty then try an immediate write, avoiding
- queue overhead. This relies on non-blocking sockets */
- if (tnode->queue == NULL && tnode->fd != -1) {
- ssize_t n = write(tnode->fd, data, length2);
- if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
- event_add_timed(node->ctdb->ev, node, timeval_zero(),
- ctdb_tcp_node_dead, node);
- /* yes, we report success, as the dead node is
- handled via a separate event */
- return 0;
- }
- if (n > 0) {
- data += n;
- length2 -= n;
- }
- if (length2 == 0) return 0;
- }
-
- pkt = talloc(tnode, struct ctdb_tcp_packet);
- CTDB_NO_MEMORY(node->ctdb, pkt);
-
- pkt->data = talloc_memdup(pkt, data, length2);
- CTDB_NO_MEMORY(node->ctdb, pkt->data);
-
- pkt->length = length2;
-
- if (tnode->queue == NULL && tnode->fd != -1) {
- EVENT_FD_WRITEABLE(tnode->fde);
- }
-
- DLIST_ADD_END(tnode->queue, pkt, struct ctdb_tcp_packet *);
-
- return 0;
+ return ctdb_queue_send(tnode->queue, data, length);
}