summaryrefslogtreecommitdiff
path: root/ctdb/ib
diff options
context:
space:
mode:
authorPeter Somogyi <psomogyi@gamax.hu>2006-12-20 19:16:30 +0100
committerPeter Somogyi <psomogyi@gamax.hu>2006-12-20 19:16:30 +0100
commit2e056a7553971be8bc064a27c8a674a0a325038b (patch)
treef0943fc6e788489197091fe01d236457b682f2db /ctdb/ib
parentefd2903e0fa4f52f42b9ad33274d224db70db4b9 (diff)
downloadsamba-2e056a7553971be8bc064a27c8a674a0a325038b.tar.gz
Added send queue.
TODO: check again & reduce. (This used to be ctdb commit 131c41f6f3e08097e7e0fab852b2a64183c695ec)
Diffstat (limited to 'ctdb/ib')
-rw-r--r--ctdb/ib/ibwrapper.c197
-rw-r--r--ctdb/ib/ibwrapper_internal.h5
2 files changed, 140 insertions, 62 deletions
diff --git a/ctdb/ib/ibwrapper.c b/ctdb/ib/ibwrapper.c
index db6e303638b..dec183f900a 100644
--- a/ctdb/ib/ibwrapper.c
+++ b/ctdb/ib/ibwrapper.c
@@ -88,31 +88,31 @@ static int ibw_init_memory(struct ibw_conn *conn)
{
struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
-
+ struct ibw_opts *opts = &pctx->opts;
int i;
struct ibw_wr *p;
pconn->buf_send = ibw_alloc_mr(pctx, pconn,
- pctx->opts.max_send_wr * pctx->opts.avg_send_size, &pconn->mr_send);
+ opts->max_send_wr * opts->avg_send_size, &pconn->mr_send);
if (!pconn->buf_send) {
sprintf(ibw_lasterr, "couldn't allocate work send buf\n");
return -1;
}
pconn->buf_recv = ibw_alloc_mr(pctx, pconn,
- pctx->opts.max_recv_wr * pctx->opts.recv_bufsize, &pconn->mr_recv);
+ opts->max_recv_wr * opts->recv_bufsize, &pconn->mr_recv);
if (!pconn->buf_recv) {
sprintf(ibw_lasterr, "couldn't allocate work recv buf\n");
return -1;
}
- pconn->wr_index = talloc_size(pconn, pctx->opts.max_send_wr * sizeof(struct ibw_wr *));
+ pconn->wr_index = talloc_size(pconn, opts->max_send_wr * sizeof(struct ibw_wr *));
assert(pconn->wr_index!=NULL);
- for(i=0; i<pctx->opts.max_send_wr; i++) {
+ for(i=0; i<opts->max_send_wr; i++) {
p = pconn->wr_index[i] = talloc_zero(pconn, struct ibw_wr);
- p->msg = pconn->buf_send + (i * pctx->opts.avg_send_size);
- p->wr_id = i;
+ p->msg = pconn->buf_send + (i * opts->avg_send_size);
+ p->wr_id = i + opts->max_recv_wr;
DLIST_ADD(pconn->wr_list_avail, p);
}
@@ -286,7 +286,7 @@ static int ibw_refill_cq_recv(struct ibw_conn *conn)
struct ibv_recv_wr *bad_wr;
list.addr = (uintptr_t) pconn->buf_recv + pctx->opts.recv_bufsize * pconn->recv_index;
- wr.wr_id = pctx->opts.max_send_wr + pconn->recv_index;
+ wr.wr_id = pconn->recv_index;
pconn->recv_index = (pconn->recv_index + 1) % pctx->opts.max_recv_wr;
rc = ibv_post_recv(pconn->cm_id->qp, &wr, &bad_wr);
@@ -318,7 +318,7 @@ static int ibw_fill_cq(struct ibw_conn *conn)
for(i = pctx->opts.max_recv_wr; i!=0; i--) {
list.addr = (uintptr_t) pconn->buf_recv + pctx->opts.recv_bufsize * pconn->recv_index;
- wr.wr_id = pctx->opts.max_send_wr + pconn->recv_index;
+ wr.wr_id = pconn->recv_index;
pconn->recv_index = (pconn->recv_index + 1) % pctx->opts.max_recv_wr;
rc = ibv_post_recv(pconn->cm_id->qp, &wr, &bad_wr);
@@ -575,17 +575,58 @@ static inline int ibw_wc_send(struct ibw_conn *conn, struct ibv_wc *wc)
struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
struct ibw_wr *p;
+ int send_index;
assert(pconn->cm_id->qp->qp_num==wc->qp_num);
- assert(wc->wr_id < pctx->opts.max_send_wr);
-
- p = pconn->wr_index[wc->wr_id];
- if (p->msg_large) {
+ assert(wc->wr_id > pctx->opts.max_recv_wr);
+ send_index = wc->wr_id - pctx->opts.max_recv_wr;
+
+ if (send_index < pctx->opts.max_send_wr) {
+ DEBUG(10, ("ibw_wc_send#1 %u", (int)wc->wr_id));
+ p = pconn->wr_index[send_index];
+ if (p->msg_large)
+ ibw_free_mr(&p->msg_large, &p->mr_large);
+ DLIST_REMOVE(pconn->wr_list_used, p);
+ DLIST_ADD(pconn->wr_list_avail, p);
+ } else {
+ DEBUG(10, ("ibw_wc_send#2 %u", (int)wc->wr_id));
+ for(p=pconn->queue_sent; p!=NULL; p=p->next)
+ if (p->wr_id==(int)wc->wr_id)
+ break;
+ if (p==NULL) {
+ sprintf(ibw_lasterr, "failed to find wr_id %d\n", (int)wc->wr_id);
+ return -1;
+ }
ibw_free_mr(&p->msg_large, &p->mr_large);
+ DLIST_REMOVE(pconn->queue_sent, p);
+ DLIST_ADD(pconn->queue_avail, p);
}
- DLIST_REMOVE(pconn->wr_list_used, p);
- DLIST_ADD(pconn->wr_list_avail, p);
+ if (pconn->queue) {
+ struct ibv_sge list = {
+ .addr = (uintptr_t) NULL,
+ .length = *(uint32_t *)(p->msg_large),
+ .lkey = 0
+ };
+ struct ibv_send_wr wr = {
+ .wr_id = p->wr_id + pctx->opts.max_recv_wr,
+ .sg_list = &list,
+ .num_sge = 1,
+ .opcode = IBV_WR_SEND,
+ .send_flags = IBV_SEND_SIGNALED,
+ };
+ struct ibv_send_wr *bad_wr;
+ int rc;
+
+ p = pconn->queue;
+ DLIST_REMOVE(pconn->queue, p);
+ DLIST_ADD(pconn->queue_sent, p);
+ rc = ibv_post_send(pconn->cm_id->qp, &wr, &bad_wr);
+ if (rc) {
+ sprintf(ibw_lasterr, "ibv_post_send failed with %d\n", rc);
+ return -1;
+ }
+ }
return 0;
}
@@ -643,19 +684,15 @@ static inline int ibw_wc_recv(struct ibw_conn *conn, struct ibv_wc *wc)
{
struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
- int recv_index;
+ struct ibw_part *part = &pconn->part;
char *p;
uint32_t remain;
- struct ibw_part *part;
assert(pconn->cm_id->qp->qp_num==wc->qp_num);
- assert((int)wc->wr_id > pctx->opts.max_send_wr);
- recv_index = (int)wc->wr_id - pctx->opts.max_send_wr;
- assert(recv_index < pctx->opts.max_recv_wr);
+ assert((int)wc->wr_id < pctx->opts.max_recv_wr);
assert(wc->byte_len <= pctx->opts.recv_bufsize);
- p = pconn->buf_recv + (recv_index * pctx->opts.recv_bufsize);
- part = &pconn->part;
+ p = pconn->buf_recv + ((int)wc->wr_id * pctx->opts.recv_bufsize);
remain = wc->byte_len;
while(remain) {
@@ -723,7 +760,6 @@ static inline int ibw_wc_recv(struct ibw_conn *conn, struct ibv_wc *wc)
error:
DEBUG(0, ("ibw_wc_recv error: %s", ibw_lasterr));
- conn->state = IBWC_ERROR;
return -1;
}
@@ -956,65 +992,102 @@ int ibw_alloc_send_buf(struct ibw_conn *conn, void **buf, void **key, int n)
struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
struct ibw_wr *p = pconn->wr_list_avail;
- if (p==NULL) {
- sprintf(ibw_lasterr, "insufficient wr chunks\n");
- return -1;
- }
-
- DLIST_REMOVE(pconn->wr_list_avail, p);
- DLIST_ADD(pconn->wr_list_used, p);
+ if (p) {
+ DLIST_REMOVE(pconn->wr_list_avail, p);
+ DLIST_ADD(pconn->wr_list_used, p);
- if (n + sizeof(long) <= pctx->opts.avg_send_size) {
- *buf = (void *)(p->msg + sizeof(long));
- *key = (void *)p;
+ if (n + sizeof(long) <= pctx->opts.avg_send_size) {
+ *buf = (void *)(p->msg + sizeof(long));
+ } else {
+ p->msg_large = ibw_alloc_mr(pctx, pconn, n + sizeof(long), &p->mr_large);
+ if (!p->msg_large) {
+ sprintf(ibw_lasterr, "ibw_alloc_mr#1 failed\n");
+ goto error;
+ }
+ *buf = (void *)(p->msg_large + sizeof(long));
+ }
} else {
+ /* not optimized */
+ p = pconn->queue_avail;
+ if (!p) {
+ p = pconn->queue_avail = talloc_zero(pconn, struct ibw_wr);
+ if (p==NULL) {
+ sprintf(ibw_lasterr, "talloc_zero failed (qmax: %u)", pconn->queue_max);
+ goto error;
+ }
+ p->wr_id = pconn->queue_max + pctx->opts.max_send_wr;
+ pconn->queue_max++;
+ switch(pconn->queue_max) {
+ case 1: DEBUG(2, ("warning: queue performed\n")); break;
+ case 10: DEBUG(0, ("warning: queue reached 10\n")); break;
+ case 100: DEBUG(0, ("warning: queue reached 100\n")); break;
+ case 1000: DEBUG(0, ("warning: queue reached 1000\n")); break;
+ default: break;
+ }
+ }
+ DLIST_REMOVE(pconn->queue_avail, p);
+
p->msg_large = ibw_alloc_mr(pctx, pconn, n + sizeof(long), &p->mr_large);
if (!p->msg_large) {
- sprintf(ibw_lasterr, "ibw_alloc_send_buf alloc error\n");
- DEBUG(0, (ibw_lasterr));
- return -1;
+ sprintf(ibw_lasterr, "ibw_alloc_mr#2 failed");
+ goto error;
}
*buf = (void *)(p->msg_large + sizeof(long));
}
+ *key = (void *)p;
+
return 0;
+error:
+ DEBUG(0, ("ibw_alloc_send_buf error: %s\n", ibw_lasterr));
+ return -1;
}
+
int ibw_send(struct ibw_conn *conn, void *buf, void *key, int n)
{
struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
struct ibw_wr *p = talloc_get_type(key, struct ibw_wr);
- struct ibv_sge list = {
- .addr = (uintptr_t) NULL,
- .length = n,
- .lkey = 0
- };
- struct ibv_send_wr wr = {
- .wr_id = p->wr_id,
- .sg_list = &list,
- .num_sge = 1,
- .opcode = IBV_WR_SEND,
- .send_flags = IBV_SEND_SIGNALED,
- };
- struct ibv_send_wr *bad_wr;
- if (n + sizeof(uint32_t)<=pctx->opts.avg_send_size) {
- assert((p->msg + sizeof(long))==(char *)buf);
- list.lkey = pconn->mr_send->lkey;
- list.addr = (uintptr_t) p->msg;
+ if (p->msg!=NULL) {
+ struct ibv_sge list = {
+ .addr = (uintptr_t) NULL,
+ .length = n,
+ .lkey = 0
+ };
+ struct ibv_send_wr wr = {
+ .wr_id = p->wr_id + pctx->opts.max_recv_wr,
+ .sg_list = &list,
+ .num_sge = 1,
+ .opcode = IBV_WR_SEND,
+ .send_flags = IBV_SEND_SIGNALED,
+ };
+ struct ibv_send_wr *bad_wr;
+
+ if (n + sizeof(uint32_t)<=pctx->opts.avg_send_size) {
+ assert((p->msg + sizeof(long))==(char *)buf);
+ list.lkey = pconn->mr_send->lkey;
+ list.addr = (uintptr_t) p->msg;
+
+ *((uint32_t *)p->msg) = htonl(n);
+ } else {
+ assert((p->msg_large + sizeof(long))==(char *)buf);
+ assert(p->mr_large!=NULL);
+ list.lkey = p->mr_large->lkey;
+ list.addr = (uintptr_t) p->msg_large;
+
+ *((uint32_t *)p->msg_large) = htonl(n);
+ }
+ return ibv_post_send(pconn->cm_id->qp, &wr, &bad_wr);
+ } /* else: */
- *((uint32_t *)p->msg) = htonl(n);
- } else {
- assert((p->msg_large + sizeof(long))==(char *)buf);
- assert(p->mr_large!=NULL);
- list.lkey = p->mr_large->lkey;
- list.addr = (uintptr_t) p->msg_large;
+ *((uint32_t *)p->msg_large) = htonl(n);
- *((uint32_t *)p->msg_large) = htonl(n);
- }
+ /* to be sent by ibw_wc_send */
+ DLIST_ADD_END(pconn->queue, p, struct ibw_wr *); /* TODO: optimize */
- return ibv_post_send(pconn->cm_id->qp, &wr, &bad_wr);
+ return 0;
}
const char *ibw_getLastError(void)
diff --git a/ctdb/ib/ibwrapper_internal.h b/ctdb/ib/ibwrapper_internal.h
index 6e34917755b..687a5797681 100644
--- a/ctdb/ib/ibwrapper_internal.h
+++ b/ctdb/ib/ibwrapper_internal.h
@@ -79,6 +79,11 @@ struct ibw_conn_priv {
struct ibw_wr *wr_list_used;
struct ibw_wr **wr_index; /* array[0..(qsize-1)] of (ibw_wr *) */
+ struct ibw_wr *queue;
+ struct ibw_wr *queue_sent;
+ struct ibw_wr *queue_avail;
+ int queue_max; /* max wr_id in the queue */
+
/* buf_recv is a ring buffer */
char *buf_recv; /* max_recv_wr * avg_recv_size */
struct ibv_mr *mr_recv;