diff options
Diffstat (limited to 'src/blocked.c')
-rw-r--r-- | src/blocked.c | 32 |
1 files changed, 20 insertions, 12 deletions
diff --git a/src/blocked.c b/src/blocked.c index 0feab4a69..aa298cffb 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -108,9 +108,11 @@ void blockClient(client *c, int btype) { /* This function is called after a client has finished a blocking operation * in order to update the total command duration, log the command into * the Slow log if needed, and log the reply duration event if needed. */ -void updateStatsOnUnblock(client *c, long blocked_us, long reply_us){ +void updateStatsOnUnblock(client *c, long blocked_us, long reply_us, int had_errors){ const ustime_t total_cmd_duration = c->duration + blocked_us + reply_us; c->lastcmd->microseconds += total_cmd_duration; + if (had_errors) + c->lastcmd->failed_calls++; if (server.latency_tracking_enabled) updateCommandLatencyHistogram(&(c->lastcmd->latency_histogram), total_cmd_duration*1000); /* Log the command into the Slow log if needed. */ @@ -314,6 +316,7 @@ void serveClientsBlockedOnListKey(robj *o, readyList *rl) { * call. */ if (dstkey) incrRefCount(dstkey); + long long prev_error_replies = server.stat_total_error_replies; client *old_client = server.current_client; server.current_client = receiver; monotime replyTimer; @@ -322,7 +325,7 @@ void serveClientsBlockedOnListKey(robj *o, readyList *rl) { rl->key, dstkey, rl->db, wherefrom, whereto, &deleted); - updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer)); + updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer), server.stat_total_error_replies != prev_error_replies); unblockClient(receiver); afterCommand(receiver); server.current_client = old_client; @@ -366,6 +369,7 @@ void serveClientsBlockedOnSortedSetKey(robj *o, readyList *rl) { ? 1 : 0; int reply_nil_when_empty = use_nested_array; + long long prev_error_replies = server.stat_total_error_replies; client *old_client = server.current_client; server.current_client = receiver; monotime replyTimer; @@ -388,7 +392,7 @@ void serveClientsBlockedOnSortedSetKey(robj *o, readyList *rl) { decrRefCount(argv[1]); if (count != -1) decrRefCount(argv[2]); - updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer)); + updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer), server.stat_total_error_replies != prev_error_replies); unblockClient(receiver); afterCommand(receiver); server.current_client = old_client; @@ -421,6 +425,12 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) { bkinfo *bki = dictFetchValue(receiver->bpop.keys,rl->key); streamID *gt = &bki->stream_id; + long long prev_error_replies = server.stat_total_error_replies; + client *old_client = server.current_client; + server.current_client = receiver; + monotime replyTimer; + elapsedStart(&replyTimer); + /* If we blocked in the context of a consumer * group, we need to resolve the group and update the * last ID the client is blocked for: this is needed @@ -440,8 +450,7 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) { addReplyError(receiver, "-NOGROUP the consumer group this client " "was blocked on no longer exists"); - unblockClient(receiver); - continue; + goto unblock_receiver; } else { *gt = group->last_id; } @@ -470,10 +479,6 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) { } } - client *old_client = server.current_client; - server.current_client = receiver; - monotime replyTimer; - elapsedStart(&replyTimer); /* Emit the two elements sub-array consisting of * the name of the stream and the data we * extracted from it. Wrapped in a single-item @@ -493,11 +498,13 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) { streamReplyWithRange(receiver,s,&start,NULL, receiver->bpop.xread_count, 0, group, consumer, noack, &pi); - updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer)); /* Note that after we unblock the client, 'gt' * and other receiver->bpop stuff are no longer * valid, so we must do the setup above before - * this call. */ + * the unblockClient call. */ + +unblock_receiver: + updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer), server.stat_total_error_replies != prev_error_replies); unblockClient(receiver); afterCommand(receiver); server.current_client = old_client; @@ -545,12 +552,13 @@ void serveClientsBlockedOnKeyByModule(readyList *rl) { * different modules with different triggers to consider if a key * is ready or not. This means we can't exit the loop but need * to continue after the first failure. */ + long long prev_error_replies = server.stat_total_error_replies; client *old_client = server.current_client; server.current_client = receiver; monotime replyTimer; elapsedStart(&replyTimer); if (!moduleTryServeClientBlockedOnKey(receiver, rl->key)) continue; - updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer)); + updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer), server.stat_total_error_replies != prev_error_replies); moduleUnblockClient(receiver); afterCommand(receiver); server.current_client = old_client; |