summaryrefslogtreecommitdiff
path: root/src/networking.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/networking.c')
-rw-r--r--src/networking.c295
1 files changed, 230 insertions, 65 deletions
diff --git a/src/networking.c b/src/networking.c
index 05001c564..b05d02b1b 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -131,7 +131,7 @@ client *createClient(connection *conn) {
connSetReadHandler(conn, readQueryFromClient);
connSetPrivateData(conn, c);
}
-
+ c->buf = zmalloc(PROTO_REPLY_CHUNK_BYTES);
selectDb(c,0);
uint64_t client_id;
atomicGetIncr(server.next_client_id, client_id, 1);
@@ -140,7 +140,9 @@ client *createClient(connection *conn) {
c->conn = conn;
c->name = NULL;
c->bufpos = 0;
- c->buf_usable_size = zmalloc_usable_size(c)-offsetof(client,buf);
+ c->buf_usable_size = zmalloc_usable_size(c->buf);
+ c->buf_peak = c->buf_usable_size;
+ c->buf_peak_last_reset_time = server.unixtime;
c->ref_repl_buf_node = NULL;
c->ref_block_pos = 0;
c->qb_pos = 0;
@@ -154,7 +156,7 @@ client *createClient(connection *conn) {
c->argv_len_sum = 0;
c->original_argc = 0;
c->original_argv = NULL;
- c->cmd = c->lastcmd = NULL;
+ c->cmd = c->lastcmd = c->realcmd = NULL;
c->multibulklen = 0;
c->bulklen = -1;
c->sentlen = 0;
@@ -173,6 +175,7 @@ client *createClient(connection *conn) {
c->slave_capa = SLAVE_CAPA_NONE;
c->slave_req = SLAVE_REQ_NONE;
c->reply = listCreate();
+ c->deferred_reply_errors = NULL;
c->reply_bytes = 0;
c->obuf_soft_limit_reached_time = 0;
listSetFreeMethod(c->reply,freeClientReplyValue);
@@ -313,6 +316,9 @@ size_t _addReplyToBuffer(client *c, const char *s, size_t len) {
size_t reply_len = len > available ? available : len;
memcpy(c->buf+c->bufpos,s,reply_len);
c->bufpos+=reply_len;
+ /* We update the buffer peak after appending the reply to the buffer */
+ if(c->buf_peak < (size_t)c->bufpos)
+ c->buf_peak = (size_t)c->bufpos;
return reply_len;
}
@@ -437,24 +443,46 @@ void addReplyErrorLength(client *c, const char *s, size_t len) {
addReplyProto(c,"\r\n",2);
}
-/* Do some actions after an error reply was sent (Log if needed, updates stats, etc.) */
-void afterErrorReply(client *c, const char *s, size_t len) {
- /* Increment the global error counter */
- server.stat_total_error_replies++;
- /* Increment the error stats
- * If the string already starts with "-..." then the error prefix
- * is provided by the caller ( we limit the search to 32 chars). Otherwise we use "-ERR". */
- if (s[0] != '-') {
- incrementErrorCount("ERR", 3);
- } else {
- char *spaceloc = memchr(s, ' ', len < 32 ? len : 32);
- if (spaceloc) {
- const size_t errEndPos = (size_t)(spaceloc - s);
- incrementErrorCount(s+1, errEndPos-1);
- } else {
- /* Fallback to ERR if we can't retrieve the error prefix */
+/* Do some actions after an error reply was sent (Log if needed, updates stats, etc.)
+ * Possible flags:
+ * * ERR_REPLY_FLAG_NO_STATS_UPDATE - indicate not to update any error stats. */
+void afterErrorReply(client *c, const char *s, size_t len, int flags) {
+ /* Module clients fall into two categories:
+ * Calls to RM_Call, in which case the error isn't being returned to a client, so should not be counted.
+ * Module thread safe context calls to RM_ReplyWithError, which will be added to a real client by the main thread later. */
+ if (c->flags & CLIENT_MODULE) {
+ if (!c->deferred_reply_errors) {
+ c->deferred_reply_errors = listCreate();
+ listSetFreeMethod(c->deferred_reply_errors, (void (*)(void*))sdsfree);
+ }
+ listAddNodeTail(c->deferred_reply_errors, sdsnewlen(s, len));
+ return;
+ }
+
+ if (!(flags & ERR_REPLY_FLAG_NO_STATS_UPDATE)) {
+ /* Increment the global error counter */
+ server.stat_total_error_replies++;
+ /* Increment the error stats
+ * If the string already starts with "-..." then the error prefix
+ * is provided by the caller ( we limit the search to 32 chars). Otherwise we use "-ERR". */
+ if (s[0] != '-') {
incrementErrorCount("ERR", 3);
+ } else {
+ char *spaceloc = memchr(s, ' ', len < 32 ? len : 32);
+ if (spaceloc) {
+ const size_t errEndPos = (size_t)(spaceloc - s);
+ incrementErrorCount(s+1, errEndPos-1);
+ } else {
+ /* Fallback to ERR if we can't retrieve the error prefix */
+ incrementErrorCount("ERR", 3);
+ }
}
+ } else {
+ /* stat_total_error_replies will not be updated, which means that
+ * the cmd stats will not be updated as well, we still want this command
+ * to be counted as failed so we update it here. We update c->realcmd in
+ * case c->cmd was changed (like in GEOADD). */
+ c->realcmd->failed_calls++;
}
/* Sometimes it could be normal that a slave replies to a master with
@@ -500,7 +528,7 @@ void afterErrorReply(client *c, const char *s, size_t len) {
* Unlike addReplyErrorSds and others alike which rely on addReplyErrorLength. */
void addReplyErrorObject(client *c, robj *err) {
addReply(c, err);
- afterErrorReply(c, err->ptr, sdslen(err->ptr)-2); /* Ignore trailing \r\n */
+ afterErrorReply(c, err->ptr, sdslen(err->ptr)-2, 0); /* Ignore trailing \r\n */
}
/* Sends either a reply or an error reply by checking the first char.
@@ -521,34 +549,57 @@ void addReplyOrErrorObject(client *c, robj *reply) {
/* See addReplyErrorLength for expectations from the input string. */
void addReplyError(client *c, const char *err) {
addReplyErrorLength(c,err,strlen(err));
- afterErrorReply(c,err,strlen(err));
+ afterErrorReply(c,err,strlen(err),0);
+}
+
+/* Add error reply to the given client.
+ * Supported flags:
+ * * ERR_REPLY_FLAG_NO_STATS_UPDATE - indicate not to perform any error stats updates */
+void addReplyErrorSdsEx(client *c, sds err, int flags) {
+ addReplyErrorLength(c,err,sdslen(err));
+ afterErrorReply(c,err,sdslen(err),flags);
+ sdsfree(err);
}
/* See addReplyErrorLength for expectations from the input string. */
/* As a side effect the SDS string is freed. */
void addReplyErrorSds(client *c, sds err) {
- addReplyErrorLength(c,err,sdslen(err));
- afterErrorReply(c,err,sdslen(err));
- sdsfree(err);
+ addReplyErrorSdsEx(c, err, 0);
}
-/* See addReplyErrorLength for expectations from the formatted string.
- * The formatted string is safe to contain \r and \n anywhere. */
-void addReplyErrorFormat(client *c, const char *fmt, ...) {
- va_list ap;
- va_start(ap,fmt);
- sds s = sdscatvprintf(sdsempty(),fmt,ap);
- va_end(ap);
+/* Internal function used by addReplyErrorFormat and addReplyErrorFormatEx.
+ * Refer to afterErrorReply for more information about the flags. */
+static void addReplyErrorFormatInternal(client *c, int flags, const char *fmt, va_list ap) {
+ va_list cpy;
+ va_copy(cpy,ap);
+ sds s = sdscatvprintf(sdsempty(),fmt,cpy);
+ va_end(cpy);
/* Trim any newlines at the end (ones will be added by addReplyErrorLength) */
s = sdstrim(s, "\r\n");
/* Make sure there are no newlines in the middle of the string, otherwise
* invalid protocol is emitted. */
s = sdsmapchars(s, "\r\n", " ", 2);
addReplyErrorLength(c,s,sdslen(s));
- afterErrorReply(c,s,sdslen(s));
+ afterErrorReply(c,s,sdslen(s),flags);
sdsfree(s);
}
+void addReplyErrorFormatEx(client *c, int flags, const char *fmt, ...) {
+ va_list ap;
+ va_start(ap,fmt);
+ addReplyErrorFormatInternal(c, flags, fmt, ap);
+ va_end(ap);
+}
+
+/* See addReplyErrorLength for expectations from the formatted string.
+ * The formatted string is safe to contain \r and \n anywhere. */
+void addReplyErrorFormat(client *c, const char *fmt, ...) {
+ va_list ap;
+ va_start(ap,fmt);
+ addReplyErrorFormatInternal(c, 0, fmt, ap);
+ va_end(ap);
+}
+
void addReplyErrorArity(client *c) {
addReplyErrorFormat(c, "wrong number of arguments for '%s' command",
c->cmd->fullname);
@@ -696,6 +747,24 @@ void setDeferredAggregateLen(client *c, void *node, long length, char prefix) {
* we return NULL in addReplyDeferredLen() */
if (node == NULL) return;
+ /* Things like *2\r\n, %3\r\n or ~4\r\n are emitted very often by the protocol
+ * so we have a few shared objects to use if the integer is small
+ * like it is most of the times. */
+ const size_t hdr_len = OBJ_SHARED_HDR_STRLEN(length);
+ const int opt_hdr = length < OBJ_SHARED_BULKHDR_LEN;
+ if (prefix == '*' && opt_hdr) {
+ setDeferredReply(c, node, shared.mbulkhdr[length]->ptr, hdr_len);
+ return;
+ }
+ if (prefix == '%' && opt_hdr) {
+ setDeferredReply(c, node, shared.maphdr[length]->ptr, hdr_len);
+ return;
+ }
+ if (prefix == '~' && opt_hdr) {
+ setDeferredReply(c, node, shared.sethdr[length]->ptr, hdr_len);
+ return;
+ }
+
char lenstr[128];
size_t lenstr_len = sprintf(lenstr, "%c%ld\r\n", prefix, length);
setDeferredReply(c, node, lenstr, lenstr_len);
@@ -788,11 +857,19 @@ void addReplyLongLongWithPrefix(client *c, long long ll, char prefix) {
/* Things like $3\r\n or *2\r\n are emitted very often by the protocol
* so we have a few shared objects to use if the integer is small
* like it is most of the times. */
- if (prefix == '*' && ll < OBJ_SHARED_BULKHDR_LEN && ll >= 0) {
- addReply(c,shared.mbulkhdr[ll]);
+ const int opt_hdr = ll < OBJ_SHARED_BULKHDR_LEN && ll >= 0;
+ const size_t hdr_len = OBJ_SHARED_HDR_STRLEN(ll);
+ if (prefix == '*' && opt_hdr) {
+ addReplyProto(c,shared.mbulkhdr[ll]->ptr,hdr_len);
+ return;
+ } else if (prefix == '$' && opt_hdr) {
+ addReplyProto(c,shared.bulkhdr[ll]->ptr,hdr_len);
+ return;
+ } else if (prefix == '%' && opt_hdr) {
+ addReplyProto(c,shared.maphdr[ll]->ptr,hdr_len);
return;
- } else if (prefix == '$' && ll < OBJ_SHARED_BULKHDR_LEN && ll >= 0) {
- addReply(c,shared.bulkhdr[ll]);
+ } else if (prefix == '~' && opt_hdr) {
+ addReplyProto(c,shared.sethdr[ll]->ptr,hdr_len);
return;
}
@@ -1024,10 +1101,28 @@ void AddReplyFromClient(client *dst, client *src) {
src->reply_bytes = 0;
src->bufpos = 0;
+ if (src->deferred_reply_errors) {
+ deferredAfterErrorReply(dst, src->deferred_reply_errors);
+ listRelease(src->deferred_reply_errors);
+ src->deferred_reply_errors = NULL;
+ }
+
/* Check output buffer limits */
closeClientOnOutputBufferLimitReached(dst, 1);
}
+/* Append the listed errors to the server error statistics. the input
+ * list is not modified and remains the responsibility of the caller. */
+void deferredAfterErrorReply(client *c, list *errors) {
+ listIter li;
+ listNode *ln;
+ listRewind(errors,&li);
+ while((ln = listNext(&li))) {
+ sds err = ln->value;
+ afterErrorReply(c, err, sdslen(err), 0);
+ }
+}
+
/* Logically copy 'src' replica client buffers info to 'dst' replica.
* Basically increase referenced buffer block node reference count. */
void copyReplicaOutputBuffer(client *dst, client *src) {
@@ -1494,9 +1589,12 @@ void freeClient(client *c) {
/* Free data structures. */
listRelease(c->reply);
+ zfree(c->buf);
freeReplicaReferencedReplBuffer(c);
freeClientArgv(c);
freeClientOriginalArgv(c);
+ if (c->deferred_reply_errors)
+ listRelease(c->deferred_reply_errors);
/* Unlink the client: this will close the socket, remove the I/O
* handlers, and remove references of the client from different
@@ -1658,10 +1756,82 @@ client *lookupClientByID(uint64_t id) {
return (c == raxNotFound) ? NULL : c;
}
+/* This function should be called from _writeToClient when the reply list is not empty,
+ * it gathers the scattered buffers from reply list and sends them away with connWritev.
+ * If we write successfully, it returns C_OK, otherwise, C_ERR is returned,
+ * and 'nwritten' is an output parameter, it means how many bytes server write
+ * to client. */
+static int _writevToClient(client *c, ssize_t *nwritten) {
+ struct iovec iov[IOV_MAX];
+ int iovcnt = 0;
+ size_t iov_bytes_len = 0;
+ /* If the static reply buffer is not empty,
+ * add it to the iov array for writev() as well. */
+ if (c->bufpos > 0) {
+ iov[iovcnt].iov_base = c->buf + c->sentlen;
+ iov[iovcnt].iov_len = c->bufpos - c->sentlen;
+ iov_bytes_len += iov[iovcnt++].iov_len;
+ }
+ /* The first node of reply list might be incomplete from the last call,
+ * thus it needs to be calibrated to get the actual data address and length. */
+ size_t offset = c->bufpos > 0 ? 0 : c->sentlen;
+ listIter iter;
+ listNode *next;
+ clientReplyBlock *o;
+ listRewind(c->reply, &iter);
+ while ((next = listNext(&iter)) && iovcnt < IOV_MAX && iov_bytes_len < NET_MAX_WRITES_PER_EVENT) {
+ o = listNodeValue(next);
+ if (o->used == 0) { /* empty node, just release it and skip. */
+ c->reply_bytes -= o->size;
+ listDelNode(c->reply, next);
+ offset = 0;
+ continue;
+ }
+
+ iov[iovcnt].iov_base = o->buf + offset;
+ iov[iovcnt].iov_len = o->used - offset;
+ iov_bytes_len += iov[iovcnt++].iov_len;
+ offset = 0;
+ }
+ if (iovcnt == 0) return C_OK;
+ *nwritten = connWritev(c->conn, iov, iovcnt);
+ if (*nwritten <= 0) return C_ERR;
+
+ /* Locate the new node which has leftover data and
+ * release all nodes in front of it. */
+ ssize_t remaining = *nwritten;
+ if (c->bufpos > 0) { /* deal with static reply buffer first. */
+ int buf_len = c->bufpos - c->sentlen;
+ c->sentlen += remaining;
+ /* If the buffer was sent, set bufpos to zero to continue with
+ * the remainder of the reply. */
+ if (remaining >= buf_len) {
+ c->bufpos = 0;
+ c->sentlen = 0;
+ }
+ remaining -= buf_len;
+ }
+ listRewind(c->reply, &iter);
+ while (remaining > 0) {
+ next = listNext(&iter);
+ o = listNodeValue(next);
+ if (remaining < (ssize_t)(o->used - c->sentlen)) {
+ c->sentlen += remaining;
+ break;
+ }
+ remaining -= (ssize_t)(o->used - c->sentlen);
+ c->reply_bytes -= o->size;
+ listDelNode(c->reply, next);
+ c->sentlen = 0;
+ }
+
+ return C_OK;
+}
+
/* This function does actual writing output buffers to different types of
* clients, it is called by writeToClient.
- * If we write successfully, it return C_OK, otherwise, C_ERR is returned,
- * And 'nwritten' is a output parameter, it means how many bytes server write
+ * If we write successfully, it returns C_OK, otherwise, C_ERR is returned,
+ * and 'nwritten' is an output parameter, it means how many bytes server write
* to client. */
int _writeToClient(client *c, ssize_t *nwritten) {
*nwritten = 0;
@@ -1690,8 +1860,18 @@ int _writeToClient(client *c, ssize_t *nwritten) {
return C_OK;
}
- if (c->bufpos > 0) {
- *nwritten = connWrite(c->conn,c->buf+c->sentlen,c->bufpos-c->sentlen);
+ /* When the reply list is not empty, it's better to use writev to save us some
+ * system calls and TCP packets. */
+ if (listLength(c->reply) > 0) {
+ int ret = _writevToClient(c, nwritten);
+ if (ret != C_OK) return ret;
+
+ /* If there are no longer objects in the list, we expect
+ * the count of reply bytes to be exactly zero. */
+ if (listLength(c->reply) == 0)
+ serverAssert(c->reply_bytes == 0);
+ } else if (c->bufpos > 0) {
+ *nwritten = connWrite(c->conn, c->buf + c->sentlen, c->bufpos - c->sentlen);
if (*nwritten <= 0) return C_ERR;
c->sentlen += *nwritten;
@@ -1701,31 +1881,8 @@ int _writeToClient(client *c, ssize_t *nwritten) {
c->bufpos = 0;
c->sentlen = 0;
}
- } else {
- clientReplyBlock *o = listNodeValue(listFirst(c->reply));
- size_t objlen = o->used;
-
- if (objlen == 0) {
- c->reply_bytes -= o->size;
- listDelNode(c->reply,listFirst(c->reply));
- return C_OK;
- }
-
- *nwritten = connWrite(c->conn, o->buf + c->sentlen, objlen - c->sentlen);
- if (*nwritten <= 0) return C_ERR;
- c->sentlen += *nwritten;
+ }
- /* If we fully sent the object on head go to the next one */
- if (c->sentlen == objlen) {
- c->reply_bytes -= o->size;
- listDelNode(c->reply,listFirst(c->reply));
- c->sentlen = 0;
- /* If there are no longer objects in the list, we expect
- * the count of reply bytes to be exactly zero. */
- if (listLength(c->reply) == 0)
- serverAssert(c->reply_bytes == 0);
- }
- }
return C_OK;
}
@@ -1863,6 +2020,10 @@ void resetClient(client *c) {
c->multibulklen = 0;
c->bulklen = -1;
+ if (c->deferred_reply_errors)
+ listRelease(c->deferred_reply_errors);
+ c->deferred_reply_errors = NULL;
+
/* We clear the ASKING flag as well if we are not inside a MULTI, and
* if what we just executed is not the ASKING command itself. */
if (!(c->flags & CLIENT_MULTI) && prevcmd != askingCommand)
@@ -2556,7 +2717,7 @@ sds catClientInfoString(sds s, client *client) {
}
sds ret = sdscatfmt(s,
- "id=%U addr=%s laddr=%s %s name=%s age=%I idle=%I flags=%s db=%i sub=%i psub=%i multi=%i qbuf=%U qbuf-free=%U argv-mem=%U multi-mem=%U obl=%U oll=%U omem=%U tot-mem=%U events=%s cmd=%s user=%s redir=%I resp=%i",
+ "id=%U addr=%s laddr=%s %s name=%s age=%I idle=%I flags=%s db=%i sub=%i psub=%i multi=%i qbuf=%U qbuf-free=%U argv-mem=%U multi-mem=%U rbs=%U rbp=%U obl=%U oll=%U omem=%U tot-mem=%U events=%s cmd=%s user=%s redir=%I resp=%i",
(unsigned long long) client->id,
getClientPeerId(client),
getClientSockname(client),
@@ -2573,6 +2734,8 @@ sds catClientInfoString(sds s, client *client) {
(unsigned long long) sdsavail(client->querybuf),
(unsigned long long) client->argv_len_sum,
(unsigned long long) client->mstate.argv_len_sums,
+ (unsigned long long) client->buf_usable_size,
+ (unsigned long long) client->buf_peak,
(unsigned long long) client->bufpos,
(unsigned long long) listLength(client->reply) + used_blocks_of_repl_buf,
(unsigned long long) obufmem, /* should not include client->buf since we want to see 0 for static clients. */
@@ -2919,6 +3082,7 @@ NULL
else
replyToBlockedClientTimedOut(target);
unblockClient(target);
+ updateStatsOnUnblock(target, 0, 0, 1);
addReply(c,shared.cone);
} else {
addReply(c,shared.czero);
@@ -3414,6 +3578,7 @@ size_t getClientMemoryUsage(client *c, size_t *output_buffer_mem_usage) {
*output_buffer_mem_usage = mem;
mem += sdsZmallocSize(c->querybuf);
mem += zmalloc_size(c);
+ mem += c->buf_usable_size;
/* For efficiency (less work keeping track of the argv memory), it doesn't include the used memory
* i.e. unused sds space and internal fragmentation, just the string length. but this is enough to
* spot problematic clients. */