summaryrefslogtreecommitdiff
path: root/src/blocked.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/blocked.c')
-rw-r--r--src/blocked.c32
1 files changed, 20 insertions, 12 deletions
diff --git a/src/blocked.c b/src/blocked.c
index 0feab4a69..aa298cffb 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -108,9 +108,11 @@ void blockClient(client *c, int btype) {
/* This function is called after a client has finished a blocking operation
* in order to update the total command duration, log the command into
* the Slow log if needed, and log the reply duration event if needed. */
-void updateStatsOnUnblock(client *c, long blocked_us, long reply_us){
+void updateStatsOnUnblock(client *c, long blocked_us, long reply_us, int had_errors){
const ustime_t total_cmd_duration = c->duration + blocked_us + reply_us;
c->lastcmd->microseconds += total_cmd_duration;
+ if (had_errors)
+ c->lastcmd->failed_calls++;
if (server.latency_tracking_enabled)
updateCommandLatencyHistogram(&(c->lastcmd->latency_histogram), total_cmd_duration*1000);
/* Log the command into the Slow log if needed. */
@@ -314,6 +316,7 @@ void serveClientsBlockedOnListKey(robj *o, readyList *rl) {
* call. */
if (dstkey) incrRefCount(dstkey);
+ long long prev_error_replies = server.stat_total_error_replies;
client *old_client = server.current_client;
server.current_client = receiver;
monotime replyTimer;
@@ -322,7 +325,7 @@ void serveClientsBlockedOnListKey(robj *o, readyList *rl) {
rl->key, dstkey, rl->db,
wherefrom, whereto,
&deleted);
- updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer));
+ updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer), server.stat_total_error_replies != prev_error_replies);
unblockClient(receiver);
afterCommand(receiver);
server.current_client = old_client;
@@ -366,6 +369,7 @@ void serveClientsBlockedOnSortedSetKey(robj *o, readyList *rl) {
? 1 : 0;
int reply_nil_when_empty = use_nested_array;
+ long long prev_error_replies = server.stat_total_error_replies;
client *old_client = server.current_client;
server.current_client = receiver;
monotime replyTimer;
@@ -388,7 +392,7 @@ void serveClientsBlockedOnSortedSetKey(robj *o, readyList *rl) {
decrRefCount(argv[1]);
if (count != -1) decrRefCount(argv[2]);
- updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer));
+ updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer), server.stat_total_error_replies != prev_error_replies);
unblockClient(receiver);
afterCommand(receiver);
server.current_client = old_client;
@@ -421,6 +425,12 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) {
bkinfo *bki = dictFetchValue(receiver->bpop.keys,rl->key);
streamID *gt = &bki->stream_id;
+ long long prev_error_replies = server.stat_total_error_replies;
+ client *old_client = server.current_client;
+ server.current_client = receiver;
+ monotime replyTimer;
+ elapsedStart(&replyTimer);
+
/* If we blocked in the context of a consumer
* group, we need to resolve the group and update the
* last ID the client is blocked for: this is needed
@@ -440,8 +450,7 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) {
addReplyError(receiver,
"-NOGROUP the consumer group this client "
"was blocked on no longer exists");
- unblockClient(receiver);
- continue;
+ goto unblock_receiver;
} else {
*gt = group->last_id;
}
@@ -470,10 +479,6 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) {
}
}
- client *old_client = server.current_client;
- server.current_client = receiver;
- monotime replyTimer;
- elapsedStart(&replyTimer);
/* Emit the two elements sub-array consisting of
* the name of the stream and the data we
* extracted from it. Wrapped in a single-item
@@ -493,11 +498,13 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) {
streamReplyWithRange(receiver,s,&start,NULL,
receiver->bpop.xread_count,
0, group, consumer, noack, &pi);
- updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer));
/* Note that after we unblock the client, 'gt'
* and other receiver->bpop stuff are no longer
* valid, so we must do the setup above before
- * this call. */
+ * the unblockClient call. */
+
+unblock_receiver:
+ updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer), server.stat_total_error_replies != prev_error_replies);
unblockClient(receiver);
afterCommand(receiver);
server.current_client = old_client;
@@ -545,12 +552,13 @@ void serveClientsBlockedOnKeyByModule(readyList *rl) {
* different modules with different triggers to consider if a key
* is ready or not. This means we can't exit the loop but need
* to continue after the first failure. */
+ long long prev_error_replies = server.stat_total_error_replies;
client *old_client = server.current_client;
server.current_client = receiver;
monotime replyTimer;
elapsedStart(&replyTimer);
if (!moduleTryServeClientBlockedOnKey(receiver, rl->key)) continue;
- updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer));
+ updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer), server.stat_total_error_replies != prev_error_replies);
moduleUnblockClient(receiver);
afterCommand(receiver);
server.current_client = old_client;