diff options
author | Peter Somogyi <psomogyi@gamax.hu> | 2006-12-20 17:42:58 +0100 |
---|---|---|
committer | Peter Somogyi <psomogyi@gamax.hu> | 2006-12-20 17:42:58 +0100 |
commit | efd2903e0fa4f52f42b9ad33274d224db70db4b9 (patch) | |
tree | 80bc9772e8f29c1f3351d2c88c44e40557ba72bb /ctdb/ib | |
parent | e667345409e0ca2322a4f0dd8e4d358796b8b661 (diff) | |
parent | 6dbaa5abfcde3c6be286ed8b59ff1fe12665c032 (diff) | |
download | samba-efd2903e0fa4f52f42b9ad33274d224db70db4b9.tar.gz |
Made receiver handle partial packets.
(This used to be ctdb commit 808fd658552e489825fb22453755e225549ebfcc)
Diffstat (limited to 'ctdb/ib')
-rw-r--r-- | ctdb/ib/ibwrapper.c | 254 | ||||
-rw-r--r-- | ctdb/ib/ibwrapper_internal.h | 19 |
2 files changed, 219 insertions, 54 deletions
diff --git a/ctdb/ib/ibwrapper.c b/ctdb/ib/ibwrapper.c index c04505bc474..db6e303638b 100644 --- a/ctdb/ib/ibwrapper.c +++ b/ctdb/ib/ibwrapper.c @@ -49,6 +49,8 @@ static char ibw_lasterr[IBW_LASTERR_BUFSIZE]; static void ibw_event_handler_verbs(struct event_context *ev, struct fd_event *fde, uint16_t flags, void *private_data); static int ibw_fill_cq(struct ibw_conn *conn); +static inline int ibw_wc_recv(struct ibw_conn *conn, struct ibv_wc *wc); +static inline int ibw_wc_send(struct ibw_conn *conn, struct ibv_wc *wc); static void *ibw_alloc_mr(struct ibw_ctx_priv *pctx, struct ibw_conn_priv *pconn, int n, struct ibv_mr **ppmr) @@ -503,67 +505,61 @@ static void ibw_event_handler_verbs(struct event_context *ev, struct ibv_wc wc; int rc; + struct ibv_cq *ev_cq; + void *ev_ctx; - rc = ibv_poll_cq(pconn->cq, 1, &wc); - if (rc!=1) { - sprintf(ibw_lasterr, "ibv_poll_cq error %d\n", rc); + /* TODO: check whether if it's good to have more channels here... */ + rc = ibv_get_cq_event(pconn->verbs_channel, &ev_cq, &ev_ctx); + if (rc) { + sprintf(ibw_lasterr, "Failed to get cq_event with %d\n", rc); goto error; } - if (wc.status) { - sprintf(ibw_lasterr, "cq completion failed status %d\n", - wc.status); + if (ev_cq != pconn->cq) { + sprintf(ibw_lasterr, "ev_cq(%u) != pconn->cq(%u)\n", + (unsigned int)ev_cq, (unsigned int)pconn->cq); + goto error; + } + rc = ibv_req_notify_cq(pconn->cq, 0); + if (rc) { + sprintf(ibw_lasterr, "Couldn't request CQ notification (%d)\n", rc); goto error; } - switch(wc.opcode) { - case IBV_WC_SEND: - { - struct ibw_wr *p; - - DEBUG(10, ("send completion\n")); - assert(pconn->cm_id->qp->qp_num==wc.qp_num); - assert(wc.wr_id < pctx->opts.max_send_wr); - - p = pconn->wr_index[wc.wr_id]; - if (p->msg_large) { - ibw_free_mr(&p->msg_large, &p->mr_large); - } - - DLIST_REMOVE(pconn->wr_list_used, p); - DLIST_ADD(pconn->wr_list_avail, p); + while((rc=ibv_poll_cq(pconn->cq, 1, &wc))==1) { + if (wc.status) { + sprintf(ibw_lasterr, "cq completion failed status %d\n", + wc.status); + goto error; } - break; - - case IBV_WC_RDMA_WRITE: - DEBUG(10, ("rdma write completion\n")); - break; - case IBV_WC_RDMA_READ: - DEBUG(10, ("rdma read completion\n")); - break; + switch(wc.opcode) { + case IBV_WC_SEND: + DEBUG(10, ("send completion\n")); + if (ibw_wc_send(conn, &wc)) + goto error; + break; - case IBV_WC_RECV: - { - int recv_index; + case IBV_WC_RDMA_WRITE: + DEBUG(10, ("rdma write completion\n")); + break; + + case IBV_WC_RDMA_READ: + DEBUG(10, ("rdma read completion\n")); + break; + case IBV_WC_RECV: DEBUG(10, ("recv completion\n")); - assert(pconn->cm_id->qp->qp_num==wc.qp_num); - assert((int)wc.wr_id > pctx->opts.max_send_wr); - recv_index = (int)wc.wr_id - pctx->opts.max_send_wr; - assert(recv_index < pctx->opts.max_recv_wr); - assert(wc.byte_len <= pctx->opts.recv_bufsize); - -/* TODO: take care of fragmented messages !!! */ - pctx->receive_func(conn, - pconn->buf_recv + (recv_index * pctx->opts.recv_bufsize), - wc.byte_len); - if (ibw_refill_cq_recv(conn)) + if (ibw_wc_recv(conn, &wc)) goto error; - } - break; + break; - default: - sprintf(ibw_lasterr, "unknown completion %d\n", wc.opcode); + default: + sprintf(ibw_lasterr, "unknown completion %d\n", wc.opcode); + goto error; + } + } + if (rc!=0) { + sprintf(ibw_lasterr, "ibv_poll_cq error %d\n", rc); goto error; } @@ -574,6 +570,163 @@ error: pctx->connstate_func(NULL, conn); } +static inline int ibw_wc_send(struct ibw_conn *conn, struct ibv_wc *wc) +{ + struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv); + struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); + struct ibw_wr *p; + + assert(pconn->cm_id->qp->qp_num==wc->qp_num); + assert(wc->wr_id < pctx->opts.max_send_wr); + + p = pconn->wr_index[wc->wr_id]; + if (p->msg_large) { + ibw_free_mr(&p->msg_large, &p->mr_large); + } + + DLIST_REMOVE(pconn->wr_list_used, p); + DLIST_ADD(pconn->wr_list_avail, p); + + return 0; +} + +static inline int ibw_append_to_part(void *memctx, struct ibw_part *part, + char **pp, uint32_t add_len, int info) +{ + /* allocate more if necessary - it's an "evergrowing" buffer... */ + if (part->len + add_len > part->bufsize) { + if (part->buf==NULL) { + assert(part->len==0); + part->buf = talloc_size(memctx, add_len); + if (part->buf==NULL) { + sprintf(ibw_lasterr, "recv talloc_size error (%u) #%d\n", + add_len, info); + return -1; + } + part->bufsize = add_len; + } else { + part->buf = talloc_realloc_size(memctx, + part->buf, part->len + add_len); + if (part->buf==NULL) { + sprintf(ibw_lasterr, "recv realloc error (%u + %u) #%d\n", + part->len, add_len, info); + return -1; + } + } + part->bufsize = part->len + add_len; + } + + /* consume pp */ + memcpy(part->buf + part->len, *pp, add_len); + *pp += add_len; + part->len += add_len; + part->to_read -= add_len; + + return 0; +} + +static inline int ibw_wc_mem_threshold(void *memctx, struct ibw_part *part, uint32_t threshold) +{ + if (part->bufsize > threshold) { + talloc_free(part->buf); + part->buf = talloc_size(memctx, threshold); + if (part->buf==NULL) { + sprintf(ibw_lasterr, "talloc_size failed\n"); + return -1; + } + part->bufsize = threshold; + } + return 0; +} + +static inline int ibw_wc_recv(struct ibw_conn *conn, struct ibv_wc *wc) +{ + struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv); + struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); + int recv_index; + char *p; + uint32_t remain; + struct ibw_part *part; + + assert(pconn->cm_id->qp->qp_num==wc->qp_num); + assert((int)wc->wr_id > pctx->opts.max_send_wr); + recv_index = (int)wc->wr_id - pctx->opts.max_send_wr; + assert(recv_index < pctx->opts.max_recv_wr); + assert(wc->byte_len <= pctx->opts.recv_bufsize); + + p = pconn->buf_recv + (recv_index * pctx->opts.recv_bufsize); + part = &pconn->part; + + remain = wc->byte_len; + while(remain) { + /* here always true: (part->len!=0 && part->to_read!=0) || + (part->len==0 && part->to_read==0) */ + if (part->len) { /* is there a partial msg to be continued? */ + int read_len = (part->to_read<=remain) ? part->to_read : remain; + if (ibw_append_to_part(pconn, part, &p, read_len, 421)) + goto error; + remain -= read_len; + + if (part->len<=sizeof(uint32_t) && part->to_read==0) { + assert(part->len==sizeof(uint32_t)); + /* set it again now... */ + part->to_read = *((uint32_t *)(part->buf)); + if (part->to_read<sizeof(uint32_t)) { + sprintf(ibw_lasterr, "got msglen=%u #2\n", part->to_read); + goto error; + } + part->to_read -= sizeof(uint32_t); /* it's already read */ + } + + if (part->to_read==0) { + pctx->receive_func(conn, part->buf, part->len); + part->len = 0; /* tells not having partial data (any more) */ + if (ibw_wc_mem_threshold(pconn, part, pctx->opts.recv_threshold)) + goto error; + } + } else { + if (remain>=sizeof(uint32_t)) { + uint32_t msglen = *(uint32_t *)p; + if (msglen<sizeof(uint32_t)) { + sprintf(ibw_lasterr, "got msglen=%u\n", msglen); + goto error; + } + + /* mostly awaited case: */ + if (msglen<=remain) { + pctx->receive_func(conn, p, msglen); + p += msglen; + remain -= msglen; + } else { + part->to_read = msglen; + /* part->len is already 0 */ + if (ibw_append_to_part(pconn, part, &p, remain, 422)) + goto error; + remain = 0; /* to be continued ... */ + /* part->to_read > 0 here */ + } + } else { /* edge case: */ + part->to_read = sizeof(uint32_t); + /* part->len is already 0 */ + if (ibw_append_to_part(pconn, part, &p, remain, 423)) + goto error; + remain = 0; + /* part->to_read > 0 here */ + } + } + } /* <remain> is always decreased at least by 1 */ + + if (ibw_refill_cq_recv(conn)) + goto error; + + return 0; + +error: + DEBUG(0, ("ibw_wc_recv error: %s", ibw_lasterr)); + conn->state = IBWC_ERROR; + return -1; +} + static int ibw_process_init_attrs(struct ibw_initattr *attr, int nattr, struct ibw_opts *opts) { int i; @@ -583,6 +736,7 @@ static int ibw_process_init_attrs(struct ibw_initattr *attr, int nattr, struct i opts->max_recv_wr = 1024; opts->avg_send_size = 1024; opts->recv_bufsize = 256; + opts->recv_threshold = 1 * 1024 * 1024; for(i=0; i<nattr; i++) { name = attr[i].name; @@ -597,6 +751,8 @@ static int ibw_process_init_attrs(struct ibw_initattr *attr, int nattr, struct i opts->avg_send_size = atoi(value); else if (strcmp(name, "recv_bufsize")==0) opts->recv_bufsize = atoi(value); + else if (strcmp(name, "recv_threshold")==0) + opts->recv_threshold = atoi(value); else { sprintf(ibw_lasterr, "ibw_init: unknown name %s\n", name); return -1; @@ -843,7 +999,7 @@ int ibw_send(struct ibw_conn *conn, void *buf, void *key, int n) }; struct ibv_send_wr *bad_wr; - if (n + sizeof(long)<=pctx->opts.avg_send_size) { + if (n + sizeof(uint32_t)<=pctx->opts.avg_send_size) { assert((p->msg + sizeof(long))==(char *)buf); list.lkey = pconn->mr_send->lkey; list.addr = (uintptr_t) p->msg; diff --git a/ctdb/ib/ibwrapper_internal.h b/ctdb/ib/ibwrapper_internal.h index b819c483d3d..6e34917755b 100644 --- a/ctdb/ib/ibwrapper_internal.h +++ b/ctdb/ib/ibwrapper_internal.h @@ -22,10 +22,11 @@ */ struct ibw_opts { - int max_send_wr; - int max_recv_wr; - int avg_send_size; - int recv_bufsize; + uint32_t max_send_wr; + uint32_t max_recv_wr; + uint32_t avg_send_size; + uint32_t recv_bufsize; + uint32_t recv_threshold; }; struct ibw_wr { @@ -56,6 +57,13 @@ struct ibw_ctx_priv { long pagesize; /* sysconf result for memalign */ }; +struct ibw_part { + char *buf; /* talloced memory buffer */ + uint32_t bufsize; /* allocated size of buf - always grows */ + uint32_t len; /* message part length */ + uint32_t to_read; /* 4 or *((uint32_t)buf) if len>=sizeof(uint32_t) */ +}; + struct ibw_conn_priv { struct ibv_comp_channel *verbs_channel; struct fd_event *verbs_channel_event; @@ -74,6 +82,7 @@ struct ibw_conn_priv { /* buf_recv is a ring buffer */ char *buf_recv; /* max_recv_wr * avg_recv_size */ struct ibv_mr *mr_recv; - int recv_index; /* index of the next recv buffer */ + int recv_index; /* index of the next recv buffer when refilling */ + struct ibw_part part; }; |