summaryrefslogtreecommitdiff
path: root/ctdb/ib/ibwrapper.c
diff options
context:
space:
mode:
authorAndrew Tridgell <tridge@samba.org>2006-12-20 10:35:27 +1100
committerAndrew Tridgell <tridge@samba.org>2006-12-20 10:35:27 +1100
commite667345409e0ca2322a4f0dd8e4d358796b8b661 (patch)
treef43c007f0c6deedd8de910d8ea8aeb659c6bbe38 /ctdb/ib/ibwrapper.c
parent9a2ca05372631481042fc2ca0a2d394caf6e68fe (diff)
parent96035955be8bd445a924f3553f24c019e0921e0f (diff)
downloadsamba-e667345409e0ca2322a4f0dd8e4d358796b8b661.tar.gz
merge from Peter
(This used to be ctdb commit 19bbfc7eeaccb795782647c743690cf5c131796d)
Diffstat (limited to 'ctdb/ib/ibwrapper.c')
-rw-r--r--ctdb/ib/ibwrapper.c181
1 files changed, 113 insertions, 68 deletions
diff --git a/ctdb/ib/ibwrapper.c b/ctdb/ib/ibwrapper.c
index b70b6caad6b..c04505bc474 100644
--- a/ctdb/ib/ibwrapper.c
+++ b/ctdb/ib/ibwrapper.c
@@ -50,6 +50,37 @@ static void ibw_event_handler_verbs(struct event_context *ev,
struct fd_event *fde, uint16_t flags, void *private_data);
static int ibw_fill_cq(struct ibw_conn *conn);
+static void *ibw_alloc_mr(struct ibw_ctx_priv *pctx, struct ibw_conn_priv *pconn,
+ int n, struct ibv_mr **ppmr)
+{
+ void *buf;
+ buf = memalign(pctx->pagesize, n);
+ if (!buf) {
+ sprintf(ibw_lasterr, "couldn't allocate memory\n");
+ return NULL;
+ }
+
+ *ppmr = ibv_reg_mr(pctx->pd, buf, n, IBV_ACCESS_LOCAL_WRITE);
+ if (!*ppmr) {
+ sprintf(ibw_lasterr, "couldn't allocate mr\n");
+ free(buf);
+ return NULL;
+ }
+
+ return buf;
+}
+
+static void ibw_free_mr(char **ppbuf, struct ibv_mr **ppmr)
+{
+ if (*ppmr!=NULL) {
+ ibv_dereg_mr(*ppmr);
+ *ppmr = NULL;
+ }
+ if (*ppbuf) {
+ free(*ppbuf);
+ *ppbuf = NULL;
+ }
+}
static int ibw_init_memory(struct ibw_conn *conn)
{
@@ -59,23 +90,26 @@ static int ibw_init_memory(struct ibw_conn *conn)
int i;
struct ibw_wr *p;
- pconn->buf = memalign(pctx->pagesize, pctx->max_msg_size);
- if (!pconn->buf) {
- sprintf(ibw_lasterr, "couldn't allocate work buf\n");
+ pconn->buf_send = ibw_alloc_mr(pctx, pconn,
+ pctx->opts.max_send_wr * pctx->opts.avg_send_size, &pconn->mr_send);
+ if (!pconn->buf_send) {
+ sprintf(ibw_lasterr, "couldn't allocate work send buf\n");
return -1;
}
- pconn->mr = ibv_reg_mr(pctx->pd, pconn->buf,
- pctx->qsize * pctx->max_msg_size, IBV_ACCESS_LOCAL_WRITE);
- if (!pconn->mr) {
- sprintf(ibw_lasterr, "couldn't allocate mr\n");
+
+ pconn->buf_recv = ibw_alloc_mr(pctx, pconn,
+ pctx->opts.max_recv_wr * pctx->opts.recv_bufsize, &pconn->mr_recv);
+ if (!pconn->buf_recv) {
+ sprintf(ibw_lasterr, "couldn't allocate work recv buf\n");
return -1;
}
- pconn->wr_index = talloc_size(pconn, pctx->qsize * sizeof(struct ibw_wr *));
+ pconn->wr_index = talloc_size(pconn, pctx->opts.max_send_wr * sizeof(struct ibw_wr *));
+ assert(pconn->wr_index!=NULL);
- for(i=0; i<pctx->qsize; i++) {
+ for(i=0; i<pctx->opts.max_send_wr; i++) {
p = pconn->wr_index[i] = talloc_zero(pconn, struct ibw_wr);
- p->msg = pconn->buf + (i * pctx->max_msg_size);
+ p->msg = pconn->buf_send + (i * pctx->opts.avg_send_size);
p->wr_id = i;
DLIST_ADD(pconn->wr_list_avail, p);
@@ -117,14 +151,8 @@ static int ibw_ctx_destruct(struct ibw_ctx *ctx)
static int ibw_conn_priv_destruct(struct ibw_conn_priv *pconn)
{
/* free memory regions */
- if (pconn->mr) {
- ibv_dereg_mr(pconn->mr);
- pconn->mr = NULL;
- }
- if (pconn->buf) {
- free(pconn->buf); /* memalign-ed */
- pconn->buf = NULL;
- }
+ ibw_free_mr(&pconn->buf_send, &pconn->mr_send);
+ ibw_free_mr(&pconn->buf_recv, &pconn->mr_recv);
/* pconn->wr_index is freed by talloc */
/* pconn->wr_index[i] are freed by talloc */
@@ -204,7 +232,8 @@ static int ibw_setup_cq_qp(struct ibw_conn *conn)
pconn->verbs_channel->fd, EVENT_FD_READ, ibw_event_handler_verbs, conn);
/* init cq */
- pconn->cq = ibv_create_cq(pconn->cm_id->verbs, pctx->qsize,
+ pconn->cq = ibv_create_cq(pconn->cm_id->verbs,
+ pctx->opts.max_recv_wr + pctx->opts.max_send_wr,
conn, pconn->verbs_channel, 0);
if (pconn->cq==NULL) {
sprintf(ibw_lasterr, "ibv_create_cq failed\n");
@@ -244,8 +273,8 @@ static int ibw_refill_cq_recv(struct ibw_conn *conn)
int rc;
struct ibv_sge list = {
.addr = (uintptr_t) NULL,
- .length = pctx->max_msg_size,
- .lkey = pconn->mr->lkey
+ .length = pctx->opts.recv_bufsize,
+ .lkey = pconn->mr_recv->lkey
};
struct ibv_recv_wr wr = {
.wr_id = 0,
@@ -253,17 +282,10 @@ static int ibw_refill_cq_recv(struct ibw_conn *conn)
.num_sge = 1,
};
struct ibv_recv_wr *bad_wr;
- struct ibw_wr *p = pconn->wr_list_avail;
- if (p==NULL) {
- sprintf(ibw_lasterr, "out of wr_list_avail");
- DEBUG(0, (ibw_lasterr));
- return -1;
- }
- DLIST_REMOVE(pconn->wr_list_avail, p);
- DLIST_ADD(pconn->wr_list_used, p);
- list.addr = (uintptr_t) p->msg;
- wr.wr_id = p->wr_id;
+ list.addr = (uintptr_t) pconn->buf_recv + pctx->opts.recv_bufsize * pconn->recv_index;
+ wr.wr_id = pctx->opts.max_send_wr + pconn->recv_index;
+ pconn->recv_index = (pconn->recv_index + 1) % pctx->opts.max_recv_wr;
rc = ibv_post_recv(pconn->cm_id->qp, &wr, &bad_wr);
if (rc) {
@@ -282,8 +304,8 @@ static int ibw_fill_cq(struct ibw_conn *conn)
int i, rc;
struct ibv_sge list = {
.addr = (uintptr_t) NULL,
- .length = pctx->max_msg_size,
- .lkey = pconn->mr->lkey
+ .length = pctx->opts.recv_bufsize,
+ .lkey = pconn->mr_recv->lkey
};
struct ibv_recv_wr wr = {
.wr_id = 0,
@@ -291,19 +313,11 @@ static int ibw_fill_cq(struct ibw_conn *conn)
.num_sge = 1,
};
struct ibv_recv_wr *bad_wr;
- struct ibw_wr *p;
for(i = pctx->opts.max_recv_wr; i!=0; i--) {
- p = pconn->wr_list_avail;
- if (p==NULL) {
- sprintf(ibw_lasterr, "out of wr_list_avail");
- DEBUG(0, (ibw_lasterr));
- return -1;
- }
- DLIST_REMOVE(pconn->wr_list_avail, p);
- DLIST_ADD(pconn->wr_list_used, p);
- list.addr = (uintptr_t) p->msg;
- wr.wr_id = p->wr_id;
+ list.addr = (uintptr_t) pconn->buf_recv + pctx->opts.recv_bufsize * pconn->recv_index;
+ wr.wr_id = pctx->opts.max_send_wr + pconn->recv_index;
+ pconn->recv_index = (pconn->recv_index + 1) % pctx->opts.max_recv_wr;
rc = ibv_post_recv(pconn->cm_id->qp, &wr, &bad_wr);
if (rc) {
@@ -508,8 +522,13 @@ static void ibw_event_handler_verbs(struct event_context *ev,
DEBUG(10, ("send completion\n"));
assert(pconn->cm_id->qp->qp_num==wc.qp_num);
- assert(wc.wr_id < pctx->qsize);
+ assert(wc.wr_id < pctx->opts.max_send_wr);
+
p = pconn->wr_index[wc.wr_id];
+ if (p->msg_large) {
+ ibw_free_mr(&p->msg_large, &p->mr_large);
+ }
+
DLIST_REMOVE(pconn->wr_list_used, p);
DLIST_ADD(pconn->wr_list_avail, p);
}
@@ -525,19 +544,19 @@ static void ibw_event_handler_verbs(struct event_context *ev,
case IBV_WC_RECV:
{
- struct ibw_wr *p;
-
- assert(pconn->cm_id->qp->qp_num==wc.qp_num);
- assert(wc.wr_id < pctx->qsize);
- p = pconn->wr_index[wc.wr_id];
-
- DLIST_REMOVE(pconn->wr_list_used, p);
- DLIST_ADD(pconn->wr_list_avail, p);
-
+ int recv_index;
+
DEBUG(10, ("recv completion\n"));
- assert(wc.byte_len <= pctx->max_msg_size);
-
- pctx->receive_func(conn, p->msg, wc.byte_len);
+ assert(pconn->cm_id->qp->qp_num==wc.qp_num);
+ assert((int)wc.wr_id > pctx->opts.max_send_wr);
+ recv_index = (int)wc.wr_id - pctx->opts.max_send_wr;
+ assert(recv_index < pctx->opts.max_recv_wr);
+ assert(wc.byte_len <= pctx->opts.recv_bufsize);
+
+/* TODO: take care of fragmented messages !!! */
+ pctx->receive_func(conn,
+ pconn->buf_recv + (recv_index * pctx->opts.recv_bufsize),
+ wc.byte_len);
if (ibw_refill_cq_recv(conn))
goto error;
}
@@ -562,6 +581,8 @@ static int ibw_process_init_attrs(struct ibw_initattr *attr, int nattr, struct i
opts->max_send_wr = 256;
opts->max_recv_wr = 1024;
+ opts->avg_send_size = 1024;
+ opts->recv_bufsize = 256;
for(i=0; i<nattr; i++) {
name = attr[i].name;
@@ -572,6 +593,10 @@ static int ibw_process_init_attrs(struct ibw_initattr *attr, int nattr, struct i
opts->max_send_wr = atoi(value);
else if (strcmp(name, "max_recv_wr")==0)
opts->max_recv_wr = atoi(value);
+ else if (strcmp(name, "avg_send_size")==0)
+ opts->avg_send_size = atoi(value);
+ else if (strcmp(name, "recv_bufsize")==0)
+ opts->recv_bufsize = atoi(value);
else {
sprintf(ibw_lasterr, "ibw_init: unknown name %s\n", name);
return -1;
@@ -584,8 +609,7 @@ struct ibw_ctx *ibw_init(struct ibw_initattr *attr, int nattr,
void *ctx_userdata,
ibw_connstate_fn_t ibw_connstate,
ibw_receive_fn_t ibw_receive,
- struct event_context *ectx,
- int max_msg_size)
+ struct event_context *ectx)
{
struct ibw_ctx *ctx = talloc_zero(NULL, struct ibw_ctx);
struct ibw_ctx_priv *pctx;
@@ -640,8 +664,6 @@ struct ibw_ctx *ibw_init(struct ibw_initattr *attr, int nattr,
DEBUG(10, ("created pd %p\n", pctx->pd));
pctx->pagesize = sysconf(_SC_PAGESIZE);
- pctx->qsize = pctx->opts.max_send_wr + pctx->opts.max_recv_wr;
- pctx->max_msg_size = max_msg_size;
return ctx;
/* don't put code here */
@@ -772,8 +794,9 @@ int ibw_disconnect(struct ibw_conn *conn)
return 0;
}
-int ibw_alloc_send_buf(struct ibw_conn *conn, void **buf, void **key)
+int ibw_alloc_send_buf(struct ibw_conn *conn, void **buf, void **key, int n)
{
+ struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
struct ibw_wr *p = pconn->wr_list_avail;
@@ -785,8 +808,18 @@ int ibw_alloc_send_buf(struct ibw_conn *conn, void **buf, void **key)
DLIST_REMOVE(pconn->wr_list_avail, p);
DLIST_ADD(pconn->wr_list_used, p);
- *buf = (void *)p->msg;
- *key = (void *)p;
+ if (n + sizeof(long) <= pctx->opts.avg_send_size) {
+ *buf = (void *)(p->msg + sizeof(long));
+ *key = (void *)p;
+ } else {
+ p->msg_large = ibw_alloc_mr(pctx, pconn, n + sizeof(long), &p->mr_large);
+ if (!p->msg_large) {
+ sprintf(ibw_lasterr, "ibw_alloc_send_buf alloc error\n");
+ DEBUG(0, (ibw_lasterr));
+ return -1;
+ }
+ *buf = (void *)(p->msg_large + sizeof(long));
+ }
return 0;
}
@@ -797,9 +830,9 @@ int ibw_send(struct ibw_conn *conn, void *buf, void *key, int n)
struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
struct ibw_wr *p = talloc_get_type(key, struct ibw_wr);
struct ibv_sge list = {
- .addr = (uintptr_t) p->msg,
+ .addr = (uintptr_t) NULL,
.length = n,
- .lkey = pconn->mr->lkey
+ .lkey = 0
};
struct ibv_send_wr wr = {
.wr_id = p->wr_id,
@@ -810,8 +843,20 @@ int ibw_send(struct ibw_conn *conn, void *buf, void *key, int n)
};
struct ibv_send_wr *bad_wr;
- assert(p->msg==(char *)buf);
- assert(n<=pctx->max_msg_size);
+ if (n + sizeof(long)<=pctx->opts.avg_send_size) {
+ assert((p->msg + sizeof(long))==(char *)buf);
+ list.lkey = pconn->mr_send->lkey;
+ list.addr = (uintptr_t) p->msg;
+
+ *((uint32_t *)p->msg) = htonl(n);
+ } else {
+ assert((p->msg_large + sizeof(long))==(char *)buf);
+ assert(p->mr_large!=NULL);
+ list.lkey = p->mr_large->lkey;
+ list.addr = (uintptr_t) p->msg_large;
+
+ *((uint32_t *)p->msg_large) = htonl(n);
+ }
return ibv_post_send(pconn->cm_id->qp, &wr, &bad_wr);
}