summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPeter Somogyi <psomogyi@gamax.hu>2007-01-05 18:13:35 +0100
committerPeter Somogyi <psomogyi@gamax.hu>2007-01-05 18:13:35 +0100
commit9c114a3fc51634abdf40b6e471b4775b8eaebac0 (patch)
treee764b2fb4a4f26f194d016dfb9054d83a1252365
parent00df320053e03065cdd7b3a7548e39b7585c4c50 (diff)
downloadsamba-9c114a3fc51634abdf40b6e471b4775b8eaebac0.tar.gz
ibw: modified tridge's code - in my point of view
ibw_alloc_send and node-centric params are the basics of these important changes. Also tried to avoid memcpy/memdup where it was possible. (This used to be ctdb commit 9e8cb9b96c685288c04ee8b69a972f582cd3c904)
-rw-r--r--ctdb/common/ctdb.c11
-rw-r--r--ctdb/common/ctdb_call.c87
-rw-r--r--ctdb/ib/ibw_ctdb_init.c45
-rw-r--r--ctdb/include/ctdb_private.h7
-rw-r--r--ctdb/tcp/tcp_init.c16
5 files changed, 119 insertions, 47 deletions
diff --git a/ctdb/common/ctdb.c b/ctdb/common/ctdb.c
index bd1e150d439..1f53074399f 100644
--- a/ctdb/common/ctdb.c
+++ b/ctdb/common/ctdb.c
@@ -30,14 +30,14 @@
int ctdb_set_transport(struct ctdb_context *ctdb, const char *transport)
{
int ctdb_tcp_init(struct ctdb_context *ctdb);
-#ifdef HAVE_INFINIBAND
+#ifdef USE_INFINIBAND
int ctdb_ibw_init(struct ctdb_context *ctdb);
#endif /*HAVE_INFINIBAND*/
if (strcmp(transport, "tcp") == 0) {
return ctdb_tcp_init(ctdb);
}
-#ifdef HAVE_INFINIBAND
+#ifdef USE_INFINIBAND
if (strcmp(transport, "ib") == 0) {
return ctdb_ibw_init(ctdb);
}
@@ -256,10 +256,15 @@ void ctdb_wait_loop(struct ctdb_context *ctdb)
}
}
+void ctdb_stopped(struct ctdb_context *ctdb)
+{
+}
+
static const struct ctdb_upcalls ctdb_upcalls = {
.recv_pkt = ctdb_recv_pkt,
.node_dead = ctdb_node_dead,
- .node_connected = ctdb_node_connected
+ .node_connected = ctdb_node_connected,
+ .stopped = ctdb_stopped
};
/*
diff --git a/ctdb/common/ctdb_call.c b/ctdb/common/ctdb_call.c
index 81f3cbdea10..a4e5b35c85f 100644
--- a/ctdb/common/ctdb_call.c
+++ b/ctdb/common/ctdb_call.c
@@ -31,12 +31,10 @@
/*
queue a packet or die
*/
-static void ctdb_queue_packet(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
+static inline void ctdb_queue_packet(struct ctdb_node *node, struct ctdb_req_header *hdr)
{
- struct ctdb_node *node;
- node = ctdb->nodes[hdr->destnode];
- if (ctdb->methods->queue_pkt(node, (uint8_t *)hdr, hdr->length) != 0) {
- ctdb_fatal(ctdb, "Unable to queue packet\n");
+ if (node->ctdb->methods->queue_pkt(node, (uint8_t *)hdr, hdr->length) != 0) {
+ ctdb_fatal(node->ctdb, "Unable to queue packet\n");
}
}
@@ -121,6 +119,9 @@ static void ctdb_send_error(struct ctdb_context *ctdb,
struct ctdb_reply_error *r;
char *msg;
int len;
+ struct ctdb_node *node;
+
+ node = ctdb->nodes[hdr->srcnode];
va_start(ap, fmt);
msg = talloc_vasprintf(ctdb, fmt, ap);
@@ -130,7 +131,7 @@ static void ctdb_send_error(struct ctdb_context *ctdb,
va_end(ap);
len = strlen(msg)+1;
- r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r) + len);
+ r = ctdb->methods->allocate_pkt(node, sizeof(*r) + len);
CTDB_NO_MEMORY_FATAL(ctdb, r);
r->hdr.length = sizeof(*r) + len;
r->hdr.operation = CTDB_REPLY_ERROR;
@@ -143,9 +144,8 @@ static void ctdb_send_error(struct ctdb_context *ctdb,
talloc_free(msg);
- ctdb_queue_packet(ctdb, &r->hdr);
-
- talloc_free(r);
+ ctdb_queue_packet(node, &r->hdr);
+ ctdb->methods->dealloc_pkt(node, r);
}
@@ -157,8 +157,11 @@ static void ctdb_call_send_redirect(struct ctdb_context *ctdb,
struct ctdb_ltdb_header *header)
{
struct ctdb_reply_redirect *r;
+ struct ctdb_node *node;
- r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r));
+ node = ctdb->nodes[c->hdr.srcnode];
+
+ r = ctdb->methods->allocate_pkt(node, sizeof(*r));
CTDB_NO_MEMORY_FATAL(ctdb, r);
r->hdr.length = sizeof(*r);
r->hdr.operation = CTDB_REPLY_REDIRECT;
@@ -167,9 +170,8 @@ static void ctdb_call_send_redirect(struct ctdb_context *ctdb,
r->hdr.reqid = c->hdr.reqid;
r->dmaster = header->dmaster;
- ctdb_queue_packet(ctdb, &r->hdr);
-
- talloc_free(r);
+ ctdb_queue_packet(node, &r->hdr);
+ ctdb->methods->dealloc_pkt(node, r);
}
/*
@@ -186,13 +188,18 @@ static void ctdb_call_send_dmaster(struct ctdb_context *ctdb,
{
struct ctdb_req_dmaster *r;
int len;
+ struct ctdb_node *node;
+ uint32_t destnode;
+
+ destnode = ctdb_lmaster(ctdb, key);
+ node = ctdb->nodes[destnode];
len = sizeof(*r) + key->dsize + data->dsize;
- r = ctdb->methods->allocate_pkt(ctdb, len);
+ r = ctdb->methods->allocate_pkt(node, len);
CTDB_NO_MEMORY_FATAL(ctdb, r);
r->hdr.length = len;
r->hdr.operation = CTDB_REQ_DMASTER;
- r->hdr.destnode = ctdb_lmaster(ctdb, key);
+ r->hdr.destnode = destnode;
r->hdr.srcnode = ctdb->vnn;
r->hdr.reqid = c->hdr.reqid;
r->dmaster = header->laccessor;
@@ -205,14 +212,14 @@ static void ctdb_call_send_dmaster(struct ctdb_context *ctdb,
/* we are the lmaster - don't send to ourselves */
ctdb_request_dmaster(ctdb, &r->hdr);
} else {
- ctdb_queue_packet(ctdb, &r->hdr);
+ ctdb_queue_packet(node, &r->hdr);
/* update the ltdb to record the new dmaster */
header->dmaster = r->hdr.destnode;
ctdb_ltdb_store(ctdb, *key, header, *data);
}
- talloc_free(r);
+ ctdb->methods->dealloc_pkt(node, r);
}
@@ -229,7 +236,9 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr
TDB_DATA key, data;
struct ctdb_ltdb_header header;
int ret;
+ struct ctdb_node *node;
+ node = ctdb->nodes[c->dmaster];
key.dptr = c->data;
key.dsize = c->keylen;
data.dptr = c->data + c->keylen;
@@ -255,7 +264,7 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr
}
/* send the CTDB_REPLY_DMASTER */
- r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r) + data.dsize);
+ r = ctdb->methods->allocate_pkt(node, sizeof(*r) + data.dsize);
CTDB_NO_MEMORY_FATAL(ctdb, r);
r->hdr.length = sizeof(*r) + data.dsize;
r->hdr.operation = CTDB_REPLY_DMASTER;
@@ -265,9 +274,8 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr
r->datalen = data.dsize;
memcpy(&r->data[0], data.dptr, data.dsize);
- ctdb_queue_packet(ctdb, &r->hdr);
-
- talloc_free(r);
+ ctdb_queue_packet(node, &r->hdr);
+ ctdb->methods->dealloc_pkt(node, r);
}
@@ -281,7 +289,9 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
struct ctdb_reply_call *r;
int ret;
struct ctdb_ltdb_header header;
+ struct ctdb_node *node;
+ node = ctdb->nodes[hdr->srcnode];
key.dptr = c->data;
key.dsize = c->keylen;
call_data.dptr = c->data + c->keylen;
@@ -317,7 +327,7 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
call_data.dsize?&call_data:NULL,
&reply_data, c->hdr.srcnode);
- r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r) + reply_data.dsize);
+ r = ctdb->methods->allocate_pkt(node, sizeof(*r) + reply_data.dsize);
CTDB_NO_MEMORY_FATAL(ctdb, r);
r->hdr.length = sizeof(*r) + reply_data.dsize;
r->hdr.operation = CTDB_REPLY_CALL;
@@ -327,10 +337,10 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
r->datalen = reply_data.dsize;
memcpy(&r->data[0], reply_data.dptr, reply_data.dsize);
- ctdb_queue_packet(ctdb, &r->hdr);
+ ctdb_queue_packet(node, &r->hdr);
+ ctdb->methods->dealloc_pkt(node, r);
talloc_free(reply_data.dptr);
- talloc_free(r);
}
enum call_state {CTDB_CALL_WAIT, CTDB_CALL_DONE, CTDB_CALL_ERROR};
@@ -440,7 +450,10 @@ void ctdb_reply_redirect(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
{
struct ctdb_reply_redirect *c = (struct ctdb_reply_redirect *)hdr;
struct ctdb_call_state *state;
-
+ struct ctdb_node *node;
+#ifdef USE_INFINIBAND
+ uint8_t *r;
+#endif /* USE_INFINIBAND */
state = idr_find(ctdb->idr, hdr->reqid);
talloc_steal(state, c);
@@ -453,7 +466,18 @@ void ctdb_reply_redirect(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
/* send it off again */
state->node = ctdb->nodes[c->dmaster];
- ctdb_queue_packet(ctdb, &state->c->hdr);
+ node = ctdb->nodes[state->c->hdr.destnode];
+
+#ifdef USE_INFINIBAND
+ r = ctdb->methods->allocate_pkt(node, state->c->hdr.length);
+ memcpy(r, &state->c->hdr, state->c->hdr.length);
+#endif /* USE_INFINIBAND */
+
+ ctdb_queue_packet(node, &state->c->hdr);
+
+#ifdef USE_INFINIBAND
+ ctdb->methods->dealloc_pkt(node, r);
+#endif /* USE_INFINIBAND */
}
/*
@@ -520,6 +544,7 @@ struct ctdb_call_state *ctdb_call_send(struct ctdb_context *ctdb,
int ret;
struct ctdb_ltdb_header header;
TDB_DATA data;
+ struct ctdb_node *node;
/*
if we are the dmaster for this key then we don't need to
@@ -538,8 +563,9 @@ struct ctdb_call_state *ctdb_call_send(struct ctdb_context *ctdb,
state = talloc_zero(ctdb, struct ctdb_call_state);
CTDB_NO_MEMORY_NULL(ctdb, state);
+ node = ctdb->nodes[header.dmaster];
len = sizeof(*state->c) + key.dsize + (call_data?call_data->dsize:0);
- state->c = ctdb->methods->allocate_pkt(ctdb, len);
+ state->c = ctdb->methods->allocate_pkt(node, len);
CTDB_NO_MEMORY_NULL(ctdb, state->c);
state->c->hdr.length = len;
@@ -566,7 +592,12 @@ struct ctdb_call_state *ctdb_call_send(struct ctdb_context *ctdb,
talloc_set_destructor(state, ctdb_call_destructor);
- ctdb_queue_packet(ctdb, &state->c->hdr);
+ ctdb_queue_packet(node, &state->c->hdr);
+
+#ifdef USE_INFINIBAND
+ ctdb->methods->dealloc_pkt(node, state->c);
+ state->c = NULL;
+#endif /* USE_INFINIBAND */
event_add_timed(ctdb->ev, state, timeval_current_ofs(CTDB_REQ_TIMEOUT, 0),
ctdb_call_timeout, state);
diff --git a/ctdb/ib/ibw_ctdb_init.c b/ctdb/ib/ibw_ctdb_init.c
index 1e8b62878d2..672dec6d329 100644
--- a/ctdb/ib/ibw_ctdb_init.c
+++ b/ctdb/ib/ibw_ctdb_init.c
@@ -29,6 +29,9 @@
#include "ibwrapper.h"
#include "ibw_ctdb.h"
+/* not nice; temporary workaround for the current implementation... */
+static void *last_key = NULL;
+
static int ctdb_ibw_listen(struct ctdb_context *ctdb, int backlog)
{
struct ibw_ctx *ictx = talloc_get_type(ctdb->private, struct ibw_ctx);
@@ -108,14 +111,12 @@ static int ctdb_ibw_add_node(struct ctdb_node *node)
/*
* transport packet allocator - allows transport to control memory for packets
*/
-static void *ctdb_ibw_allocate_pkt(struct ctdb_context *ctdb, size_t size)
+static void *ctdb_ibw_allocate_pkt(struct ctdb_node *node, size_t size)
{
struct ibw_conn *conn = NULL;
void *buf = NULL;
- void *key; /* TODO: expand the param list with this */
- /* TODO2: !!! I need "node" or ibw_conn here */
- if (ibw_alloc_send_buf(conn, &buf, &key, (int)size))
+ if (ibw_alloc_send_buf(conn, &buf, &last_key, size))
return NULL;
return buf;
@@ -124,20 +125,40 @@ static void *ctdb_ibw_allocate_pkt(struct ctdb_context *ctdb, size_t size)
static int ctdb_ibw_queue_pkt(struct ctdb_node *node, uint8_t *data, uint32_t length)
{
struct ibw_conn *conn = talloc_get_type(node->private, struct ibw_conn);
- void *key = NULL; /* TODO: expand the param list with this */
+ int rc;
+
+ rc = ibw_send(conn, data, last_key, length);
+ last_key = NULL;
+
+ return rc;
+}
- assert(conn!=NULL);
- return ibw_send(conn, data, key, length);
+static void ctdb_ibw_dealloc_pkt(struct ctdb_node *node, void *data)
+{
+ if (last_key) {
+ struct ibw_conn *conn = talloc_get_type(node->private, struct ibw_conn);
+
+ assert(conn!=NULL);
+ ibw_cancel_send_buf(conn, data, last_key);
+ } /* else ibw_send is already using it and will free it after completion */
+}
+
+static int ctdb_ibw_stop(struct ctdb_context *cctx)
+{
+ struct ibw_ctx *ictx = talloc_get_type(cctx->private, struct ibw_ctx);
+
+ assert(ictx!=NULL);
+ return ibw_stop(ictx);
}
static const struct ctdb_methods ctdb_ibw_methods = {
.start = ctdb_ibw_start,
.add_node = ctdb_ibw_add_node,
.queue_pkt = ctdb_ibw_queue_pkt,
- .allocate_pkt = ctdb_ibw_allocate_pkt
-
-// .dealloc_pkt = ctdb_ibw_dealloc_pkt
-// .stop = ctdb_ibw_stop
+ .allocate_pkt = ctdb_ibw_allocate_pkt,
+
+ .dealloc_pkt = ctdb_ibw_dealloc_pkt,
+ .stop = ctdb_ibw_stop
};
/*
@@ -146,7 +167,7 @@ static const struct ctdb_methods ctdb_ibw_methods = {
int ctdb_ibw_init(struct ctdb_context *ctdb)
{
struct ibw_ctx *ictx;
-
+
ictx = ibw_init(
NULL, //struct ibw_initattr *attr, /* TODO */
0, //int nattr, /* TODO */
diff --git a/ctdb/include/ctdb_private.h b/ctdb/include/ctdb_private.h
index a024147c63e..c99bb1a5542 100644
--- a/ctdb/include/ctdb_private.h
+++ b/ctdb/include/ctdb_private.h
@@ -55,7 +55,9 @@ struct ctdb_methods {
int (*start)(struct ctdb_context *); /* start protocol processing */
int (*add_node)(struct ctdb_node *); /* setup a new node */
int (*queue_pkt)(struct ctdb_node *, uint8_t *data, uint32_t length);
- void *(*allocate_pkt)(struct ctdb_context *, size_t );
+ void *(*allocate_pkt)(struct ctdb_node *, size_t);
+ void (*dealloc_pkt)(struct ctdb_node *, void *data);
+ int (*stop)(struct ctdb_context *); /* initiate stopping the protocol */
};
/*
@@ -70,6 +72,9 @@ struct ctdb_upcalls {
/* node_connected is called when a connection to a node is established */
void (*node_connected)(struct ctdb_node *);
+
+ /* protocol has been stopped */
+ void (*stopped)(struct ctdb_context *);
};
/* main state of the ctdb daemon */
diff --git a/ctdb/tcp/tcp_init.c b/ctdb/tcp/tcp_init.c
index f261d0c7dac..be0499e079c 100644
--- a/ctdb/tcp/tcp_init.c
+++ b/ctdb/tcp/tcp_init.c
@@ -67,21 +67,31 @@ int ctdb_tcp_add_node(struct ctdb_node *node)
/*
transport packet allocator - allows transport to control memory for packets
*/
-void *ctdb_tcp_allocate_pkt(struct ctdb_context *ctdb, size_t size)
+void *ctdb_tcp_allocate_pkt(struct ctdb_node *node, size_t size)
{
/* tcp transport needs to round to 8 byte alignment to ensure
that we can use a length header and 64 bit elements in
structures */
size = (size+(CTDB_TCP_ALIGNMENT-1)) & ~(CTDB_TCP_ALIGNMENT-1);
- return talloc_size(ctdb, size);
+ return talloc_size(node, size);
}
+void ctdb_tcp_dealloc_pkt(struct ctdb_node *node, void *buf)
+{
+ talloc_free(buf);
+}
+
+int ctdb_tcp_stop(struct ctdb_context *ctdb)
+{
+ return 0;
+}
static const struct ctdb_methods ctdb_tcp_methods = {
.start = ctdb_tcp_start,
.add_node = ctdb_tcp_add_node,
.queue_pkt = ctdb_tcp_queue_pkt,
- .allocate_pkt = ctdb_tcp_allocate_pkt
+ .allocate_pkt = ctdb_tcp_allocate_pkt,
+ .dealloc_pkt = ctdb_tcp_dealloc_pkt
};
/*