diff options
Diffstat (limited to 'src/networking.c')
-rw-r--r-- | src/networking.c | 295 |
1 files changed, 230 insertions, 65 deletions
diff --git a/src/networking.c b/src/networking.c index 05001c564..b05d02b1b 100644 --- a/src/networking.c +++ b/src/networking.c @@ -131,7 +131,7 @@ client *createClient(connection *conn) { connSetReadHandler(conn, readQueryFromClient); connSetPrivateData(conn, c); } - + c->buf = zmalloc(PROTO_REPLY_CHUNK_BYTES); selectDb(c,0); uint64_t client_id; atomicGetIncr(server.next_client_id, client_id, 1); @@ -140,7 +140,9 @@ client *createClient(connection *conn) { c->conn = conn; c->name = NULL; c->bufpos = 0; - c->buf_usable_size = zmalloc_usable_size(c)-offsetof(client,buf); + c->buf_usable_size = zmalloc_usable_size(c->buf); + c->buf_peak = c->buf_usable_size; + c->buf_peak_last_reset_time = server.unixtime; c->ref_repl_buf_node = NULL; c->ref_block_pos = 0; c->qb_pos = 0; @@ -154,7 +156,7 @@ client *createClient(connection *conn) { c->argv_len_sum = 0; c->original_argc = 0; c->original_argv = NULL; - c->cmd = c->lastcmd = NULL; + c->cmd = c->lastcmd = c->realcmd = NULL; c->multibulklen = 0; c->bulklen = -1; c->sentlen = 0; @@ -173,6 +175,7 @@ client *createClient(connection *conn) { c->slave_capa = SLAVE_CAPA_NONE; c->slave_req = SLAVE_REQ_NONE; c->reply = listCreate(); + c->deferred_reply_errors = NULL; c->reply_bytes = 0; c->obuf_soft_limit_reached_time = 0; listSetFreeMethod(c->reply,freeClientReplyValue); @@ -313,6 +316,9 @@ size_t _addReplyToBuffer(client *c, const char *s, size_t len) { size_t reply_len = len > available ? available : len; memcpy(c->buf+c->bufpos,s,reply_len); c->bufpos+=reply_len; + /* We update the buffer peak after appending the reply to the buffer */ + if(c->buf_peak < (size_t)c->bufpos) + c->buf_peak = (size_t)c->bufpos; return reply_len; } @@ -437,24 +443,46 @@ void addReplyErrorLength(client *c, const char *s, size_t len) { addReplyProto(c,"\r\n",2); } -/* Do some actions after an error reply was sent (Log if needed, updates stats, etc.) */ -void afterErrorReply(client *c, const char *s, size_t len) { - /* Increment the global error counter */ - server.stat_total_error_replies++; - /* Increment the error stats - * If the string already starts with "-..." then the error prefix - * is provided by the caller ( we limit the search to 32 chars). Otherwise we use "-ERR". */ - if (s[0] != '-') { - incrementErrorCount("ERR", 3); - } else { - char *spaceloc = memchr(s, ' ', len < 32 ? len : 32); - if (spaceloc) { - const size_t errEndPos = (size_t)(spaceloc - s); - incrementErrorCount(s+1, errEndPos-1); - } else { - /* Fallback to ERR if we can't retrieve the error prefix */ +/* Do some actions after an error reply was sent (Log if needed, updates stats, etc.) + * Possible flags: + * * ERR_REPLY_FLAG_NO_STATS_UPDATE - indicate not to update any error stats. */ +void afterErrorReply(client *c, const char *s, size_t len, int flags) { + /* Module clients fall into two categories: + * Calls to RM_Call, in which case the error isn't being returned to a client, so should not be counted. + * Module thread safe context calls to RM_ReplyWithError, which will be added to a real client by the main thread later. */ + if (c->flags & CLIENT_MODULE) { + if (!c->deferred_reply_errors) { + c->deferred_reply_errors = listCreate(); + listSetFreeMethod(c->deferred_reply_errors, (void (*)(void*))sdsfree); + } + listAddNodeTail(c->deferred_reply_errors, sdsnewlen(s, len)); + return; + } + + if (!(flags & ERR_REPLY_FLAG_NO_STATS_UPDATE)) { + /* Increment the global error counter */ + server.stat_total_error_replies++; + /* Increment the error stats + * If the string already starts with "-..." then the error prefix + * is provided by the caller ( we limit the search to 32 chars). Otherwise we use "-ERR". */ + if (s[0] != '-') { incrementErrorCount("ERR", 3); + } else { + char *spaceloc = memchr(s, ' ', len < 32 ? len : 32); + if (spaceloc) { + const size_t errEndPos = (size_t)(spaceloc - s); + incrementErrorCount(s+1, errEndPos-1); + } else { + /* Fallback to ERR if we can't retrieve the error prefix */ + incrementErrorCount("ERR", 3); + } } + } else { + /* stat_total_error_replies will not be updated, which means that + * the cmd stats will not be updated as well, we still want this command + * to be counted as failed so we update it here. We update c->realcmd in + * case c->cmd was changed (like in GEOADD). */ + c->realcmd->failed_calls++; } /* Sometimes it could be normal that a slave replies to a master with @@ -500,7 +528,7 @@ void afterErrorReply(client *c, const char *s, size_t len) { * Unlike addReplyErrorSds and others alike which rely on addReplyErrorLength. */ void addReplyErrorObject(client *c, robj *err) { addReply(c, err); - afterErrorReply(c, err->ptr, sdslen(err->ptr)-2); /* Ignore trailing \r\n */ + afterErrorReply(c, err->ptr, sdslen(err->ptr)-2, 0); /* Ignore trailing \r\n */ } /* Sends either a reply or an error reply by checking the first char. @@ -521,34 +549,57 @@ void addReplyOrErrorObject(client *c, robj *reply) { /* See addReplyErrorLength for expectations from the input string. */ void addReplyError(client *c, const char *err) { addReplyErrorLength(c,err,strlen(err)); - afterErrorReply(c,err,strlen(err)); + afterErrorReply(c,err,strlen(err),0); +} + +/* Add error reply to the given client. + * Supported flags: + * * ERR_REPLY_FLAG_NO_STATS_UPDATE - indicate not to perform any error stats updates */ +void addReplyErrorSdsEx(client *c, sds err, int flags) { + addReplyErrorLength(c,err,sdslen(err)); + afterErrorReply(c,err,sdslen(err),flags); + sdsfree(err); } /* See addReplyErrorLength for expectations from the input string. */ /* As a side effect the SDS string is freed. */ void addReplyErrorSds(client *c, sds err) { - addReplyErrorLength(c,err,sdslen(err)); - afterErrorReply(c,err,sdslen(err)); - sdsfree(err); + addReplyErrorSdsEx(c, err, 0); } -/* See addReplyErrorLength for expectations from the formatted string. - * The formatted string is safe to contain \r and \n anywhere. */ -void addReplyErrorFormat(client *c, const char *fmt, ...) { - va_list ap; - va_start(ap,fmt); - sds s = sdscatvprintf(sdsempty(),fmt,ap); - va_end(ap); +/* Internal function used by addReplyErrorFormat and addReplyErrorFormatEx. + * Refer to afterErrorReply for more information about the flags. */ +static void addReplyErrorFormatInternal(client *c, int flags, const char *fmt, va_list ap) { + va_list cpy; + va_copy(cpy,ap); + sds s = sdscatvprintf(sdsempty(),fmt,cpy); + va_end(cpy); /* Trim any newlines at the end (ones will be added by addReplyErrorLength) */ s = sdstrim(s, "\r\n"); /* Make sure there are no newlines in the middle of the string, otherwise * invalid protocol is emitted. */ s = sdsmapchars(s, "\r\n", " ", 2); addReplyErrorLength(c,s,sdslen(s)); - afterErrorReply(c,s,sdslen(s)); + afterErrorReply(c,s,sdslen(s),flags); sdsfree(s); } +void addReplyErrorFormatEx(client *c, int flags, const char *fmt, ...) { + va_list ap; + va_start(ap,fmt); + addReplyErrorFormatInternal(c, flags, fmt, ap); + va_end(ap); +} + +/* See addReplyErrorLength for expectations from the formatted string. + * The formatted string is safe to contain \r and \n anywhere. */ +void addReplyErrorFormat(client *c, const char *fmt, ...) { + va_list ap; + va_start(ap,fmt); + addReplyErrorFormatInternal(c, 0, fmt, ap); + va_end(ap); +} + void addReplyErrorArity(client *c) { addReplyErrorFormat(c, "wrong number of arguments for '%s' command", c->cmd->fullname); @@ -696,6 +747,24 @@ void setDeferredAggregateLen(client *c, void *node, long length, char prefix) { * we return NULL in addReplyDeferredLen() */ if (node == NULL) return; + /* Things like *2\r\n, %3\r\n or ~4\r\n are emitted very often by the protocol + * so we have a few shared objects to use if the integer is small + * like it is most of the times. */ + const size_t hdr_len = OBJ_SHARED_HDR_STRLEN(length); + const int opt_hdr = length < OBJ_SHARED_BULKHDR_LEN; + if (prefix == '*' && opt_hdr) { + setDeferredReply(c, node, shared.mbulkhdr[length]->ptr, hdr_len); + return; + } + if (prefix == '%' && opt_hdr) { + setDeferredReply(c, node, shared.maphdr[length]->ptr, hdr_len); + return; + } + if (prefix == '~' && opt_hdr) { + setDeferredReply(c, node, shared.sethdr[length]->ptr, hdr_len); + return; + } + char lenstr[128]; size_t lenstr_len = sprintf(lenstr, "%c%ld\r\n", prefix, length); setDeferredReply(c, node, lenstr, lenstr_len); @@ -788,11 +857,19 @@ void addReplyLongLongWithPrefix(client *c, long long ll, char prefix) { /* Things like $3\r\n or *2\r\n are emitted very often by the protocol * so we have a few shared objects to use if the integer is small * like it is most of the times. */ - if (prefix == '*' && ll < OBJ_SHARED_BULKHDR_LEN && ll >= 0) { - addReply(c,shared.mbulkhdr[ll]); + const int opt_hdr = ll < OBJ_SHARED_BULKHDR_LEN && ll >= 0; + const size_t hdr_len = OBJ_SHARED_HDR_STRLEN(ll); + if (prefix == '*' && opt_hdr) { + addReplyProto(c,shared.mbulkhdr[ll]->ptr,hdr_len); + return; + } else if (prefix == '$' && opt_hdr) { + addReplyProto(c,shared.bulkhdr[ll]->ptr,hdr_len); + return; + } else if (prefix == '%' && opt_hdr) { + addReplyProto(c,shared.maphdr[ll]->ptr,hdr_len); return; - } else if (prefix == '$' && ll < OBJ_SHARED_BULKHDR_LEN && ll >= 0) { - addReply(c,shared.bulkhdr[ll]); + } else if (prefix == '~' && opt_hdr) { + addReplyProto(c,shared.sethdr[ll]->ptr,hdr_len); return; } @@ -1024,10 +1101,28 @@ void AddReplyFromClient(client *dst, client *src) { src->reply_bytes = 0; src->bufpos = 0; + if (src->deferred_reply_errors) { + deferredAfterErrorReply(dst, src->deferred_reply_errors); + listRelease(src->deferred_reply_errors); + src->deferred_reply_errors = NULL; + } + /* Check output buffer limits */ closeClientOnOutputBufferLimitReached(dst, 1); } +/* Append the listed errors to the server error statistics. the input + * list is not modified and remains the responsibility of the caller. */ +void deferredAfterErrorReply(client *c, list *errors) { + listIter li; + listNode *ln; + listRewind(errors,&li); + while((ln = listNext(&li))) { + sds err = ln->value; + afterErrorReply(c, err, sdslen(err), 0); + } +} + /* Logically copy 'src' replica client buffers info to 'dst' replica. * Basically increase referenced buffer block node reference count. */ void copyReplicaOutputBuffer(client *dst, client *src) { @@ -1494,9 +1589,12 @@ void freeClient(client *c) { /* Free data structures. */ listRelease(c->reply); + zfree(c->buf); freeReplicaReferencedReplBuffer(c); freeClientArgv(c); freeClientOriginalArgv(c); + if (c->deferred_reply_errors) + listRelease(c->deferred_reply_errors); /* Unlink the client: this will close the socket, remove the I/O * handlers, and remove references of the client from different @@ -1658,10 +1756,82 @@ client *lookupClientByID(uint64_t id) { return (c == raxNotFound) ? NULL : c; } +/* This function should be called from _writeToClient when the reply list is not empty, + * it gathers the scattered buffers from reply list and sends them away with connWritev. + * If we write successfully, it returns C_OK, otherwise, C_ERR is returned, + * and 'nwritten' is an output parameter, it means how many bytes server write + * to client. */ +static int _writevToClient(client *c, ssize_t *nwritten) { + struct iovec iov[IOV_MAX]; + int iovcnt = 0; + size_t iov_bytes_len = 0; + /* If the static reply buffer is not empty, + * add it to the iov array for writev() as well. */ + if (c->bufpos > 0) { + iov[iovcnt].iov_base = c->buf + c->sentlen; + iov[iovcnt].iov_len = c->bufpos - c->sentlen; + iov_bytes_len += iov[iovcnt++].iov_len; + } + /* The first node of reply list might be incomplete from the last call, + * thus it needs to be calibrated to get the actual data address and length. */ + size_t offset = c->bufpos > 0 ? 0 : c->sentlen; + listIter iter; + listNode *next; + clientReplyBlock *o; + listRewind(c->reply, &iter); + while ((next = listNext(&iter)) && iovcnt < IOV_MAX && iov_bytes_len < NET_MAX_WRITES_PER_EVENT) { + o = listNodeValue(next); + if (o->used == 0) { /* empty node, just release it and skip. */ + c->reply_bytes -= o->size; + listDelNode(c->reply, next); + offset = 0; + continue; + } + + iov[iovcnt].iov_base = o->buf + offset; + iov[iovcnt].iov_len = o->used - offset; + iov_bytes_len += iov[iovcnt++].iov_len; + offset = 0; + } + if (iovcnt == 0) return C_OK; + *nwritten = connWritev(c->conn, iov, iovcnt); + if (*nwritten <= 0) return C_ERR; + + /* Locate the new node which has leftover data and + * release all nodes in front of it. */ + ssize_t remaining = *nwritten; + if (c->bufpos > 0) { /* deal with static reply buffer first. */ + int buf_len = c->bufpos - c->sentlen; + c->sentlen += remaining; + /* If the buffer was sent, set bufpos to zero to continue with + * the remainder of the reply. */ + if (remaining >= buf_len) { + c->bufpos = 0; + c->sentlen = 0; + } + remaining -= buf_len; + } + listRewind(c->reply, &iter); + while (remaining > 0) { + next = listNext(&iter); + o = listNodeValue(next); + if (remaining < (ssize_t)(o->used - c->sentlen)) { + c->sentlen += remaining; + break; + } + remaining -= (ssize_t)(o->used - c->sentlen); + c->reply_bytes -= o->size; + listDelNode(c->reply, next); + c->sentlen = 0; + } + + return C_OK; +} + /* This function does actual writing output buffers to different types of * clients, it is called by writeToClient. - * If we write successfully, it return C_OK, otherwise, C_ERR is returned, - * And 'nwritten' is a output parameter, it means how many bytes server write + * If we write successfully, it returns C_OK, otherwise, C_ERR is returned, + * and 'nwritten' is an output parameter, it means how many bytes server write * to client. */ int _writeToClient(client *c, ssize_t *nwritten) { *nwritten = 0; @@ -1690,8 +1860,18 @@ int _writeToClient(client *c, ssize_t *nwritten) { return C_OK; } - if (c->bufpos > 0) { - *nwritten = connWrite(c->conn,c->buf+c->sentlen,c->bufpos-c->sentlen); + /* When the reply list is not empty, it's better to use writev to save us some + * system calls and TCP packets. */ + if (listLength(c->reply) > 0) { + int ret = _writevToClient(c, nwritten); + if (ret != C_OK) return ret; + + /* If there are no longer objects in the list, we expect + * the count of reply bytes to be exactly zero. */ + if (listLength(c->reply) == 0) + serverAssert(c->reply_bytes == 0); + } else if (c->bufpos > 0) { + *nwritten = connWrite(c->conn, c->buf + c->sentlen, c->bufpos - c->sentlen); if (*nwritten <= 0) return C_ERR; c->sentlen += *nwritten; @@ -1701,31 +1881,8 @@ int _writeToClient(client *c, ssize_t *nwritten) { c->bufpos = 0; c->sentlen = 0; } - } else { - clientReplyBlock *o = listNodeValue(listFirst(c->reply)); - size_t objlen = o->used; - - if (objlen == 0) { - c->reply_bytes -= o->size; - listDelNode(c->reply,listFirst(c->reply)); - return C_OK; - } - - *nwritten = connWrite(c->conn, o->buf + c->sentlen, objlen - c->sentlen); - if (*nwritten <= 0) return C_ERR; - c->sentlen += *nwritten; + } - /* If we fully sent the object on head go to the next one */ - if (c->sentlen == objlen) { - c->reply_bytes -= o->size; - listDelNode(c->reply,listFirst(c->reply)); - c->sentlen = 0; - /* If there are no longer objects in the list, we expect - * the count of reply bytes to be exactly zero. */ - if (listLength(c->reply) == 0) - serverAssert(c->reply_bytes == 0); - } - } return C_OK; } @@ -1863,6 +2020,10 @@ void resetClient(client *c) { c->multibulklen = 0; c->bulklen = -1; + if (c->deferred_reply_errors) + listRelease(c->deferred_reply_errors); + c->deferred_reply_errors = NULL; + /* We clear the ASKING flag as well if we are not inside a MULTI, and * if what we just executed is not the ASKING command itself. */ if (!(c->flags & CLIENT_MULTI) && prevcmd != askingCommand) @@ -2556,7 +2717,7 @@ sds catClientInfoString(sds s, client *client) { } sds ret = sdscatfmt(s, - "id=%U addr=%s laddr=%s %s name=%s age=%I idle=%I flags=%s db=%i sub=%i psub=%i multi=%i qbuf=%U qbuf-free=%U argv-mem=%U multi-mem=%U obl=%U oll=%U omem=%U tot-mem=%U events=%s cmd=%s user=%s redir=%I resp=%i", + "id=%U addr=%s laddr=%s %s name=%s age=%I idle=%I flags=%s db=%i sub=%i psub=%i multi=%i qbuf=%U qbuf-free=%U argv-mem=%U multi-mem=%U rbs=%U rbp=%U obl=%U oll=%U omem=%U tot-mem=%U events=%s cmd=%s user=%s redir=%I resp=%i", (unsigned long long) client->id, getClientPeerId(client), getClientSockname(client), @@ -2573,6 +2734,8 @@ sds catClientInfoString(sds s, client *client) { (unsigned long long) sdsavail(client->querybuf), (unsigned long long) client->argv_len_sum, (unsigned long long) client->mstate.argv_len_sums, + (unsigned long long) client->buf_usable_size, + (unsigned long long) client->buf_peak, (unsigned long long) client->bufpos, (unsigned long long) listLength(client->reply) + used_blocks_of_repl_buf, (unsigned long long) obufmem, /* should not include client->buf since we want to see 0 for static clients. */ @@ -2919,6 +3082,7 @@ NULL else replyToBlockedClientTimedOut(target); unblockClient(target); + updateStatsOnUnblock(target, 0, 0, 1); addReply(c,shared.cone); } else { addReply(c,shared.czero); @@ -3414,6 +3578,7 @@ size_t getClientMemoryUsage(client *c, size_t *output_buffer_mem_usage) { *output_buffer_mem_usage = mem; mem += sdsZmallocSize(c->querybuf); mem += zmalloc_size(c); + mem += c->buf_usable_size; /* For efficiency (less work keeping track of the argv memory), it doesn't include the used memory * i.e. unused sds space and internal fragmentation, just the string length. but this is enough to * spot problematic clients. */ |