diff options
Diffstat (limited to 'deps/hiredis/async.c')
-rw-r--r-- | deps/hiredis/async.c | 128 |
1 files changed, 75 insertions, 53 deletions
diff --git a/deps/hiredis/async.c b/deps/hiredis/async.c index 64ab601c9..d73d09fb1 100644 --- a/deps/hiredis/async.c +++ b/deps/hiredis/async.c @@ -47,6 +47,11 @@ #include "async_private.h" +#ifdef NDEBUG +#undef assert +#define assert(e) (void)(e) +#endif + /* Forward declarations of hiredis.c functions */ int __redisAppendCommand(redisContext *c, const char *cmd, size_t len); void __redisSetError(redisContext *c, int type, const char *str); @@ -139,8 +144,8 @@ static redisAsyncContext *redisAsyncInitialize(redisContext *c) { ac->replies.head = NULL; ac->replies.tail = NULL; - ac->sub.invalid.head = NULL; - ac->sub.invalid.tail = NULL; + ac->sub.replies.head = NULL; + ac->sub.replies.tail = NULL; ac->sub.channels = channels; ac->sub.patterns = patterns; @@ -301,36 +306,28 @@ static void __redisRunPushCallback(redisAsyncContext *ac, redisReply *reply) { static void __redisAsyncFree(redisAsyncContext *ac) { redisContext *c = &(ac->c); redisCallback cb; - dictIterator *it; + dictIterator it; dictEntry *de; /* Execute pending callbacks with NULL reply. */ while (__redisShiftCallback(&ac->replies,&cb) == REDIS_OK) __redisRunCallback(ac,&cb,NULL); - - /* Execute callbacks for invalid commands */ - while (__redisShiftCallback(&ac->sub.invalid,&cb) == REDIS_OK) + while (__redisShiftCallback(&ac->sub.replies,&cb) == REDIS_OK) __redisRunCallback(ac,&cb,NULL); /* Run subscription callbacks with NULL reply */ if (ac->sub.channels) { - it = dictGetIterator(ac->sub.channels); - if (it != NULL) { - while ((de = dictNext(it)) != NULL) - __redisRunCallback(ac,dictGetEntryVal(de),NULL); - dictReleaseIterator(it); - } + dictInitIterator(&it,ac->sub.channels); + while ((de = dictNext(&it)) != NULL) + __redisRunCallback(ac,dictGetEntryVal(de),NULL); dictRelease(ac->sub.channels); } if (ac->sub.patterns) { - it = dictGetIterator(ac->sub.patterns); - if (it != NULL) { - while ((de = dictNext(it)) != NULL) - __redisRunCallback(ac,dictGetEntryVal(de),NULL); - dictReleaseIterator(it); - } + dictInitIterator(&it,ac->sub.patterns); + while ((de = dictNext(&it)) != NULL) + __redisRunCallback(ac,dictGetEntryVal(de),NULL); dictRelease(ac->sub.patterns); } @@ -420,10 +417,11 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, char *stype; hisds sname; - /* Custom reply functions are not supported for pub/sub. This will fail - * very hard when they are used... */ - if (reply->type == REDIS_REPLY_ARRAY || reply->type == REDIS_REPLY_PUSH) { - assert(reply->elements >= 2); + /* Match reply with the expected format of a pushed message. + * The type and number of elements (3 to 4) are specified at: + * https://redis.io/topics/pubsub#format-of-pushed-messages */ + if ((reply->type == REDIS_REPLY_ARRAY && !(c->flags & REDIS_SUPPORTS_PUSH) && reply->elements >= 3) || + reply->type == REDIS_REPLY_PUSH) { assert(reply->element[0]->type == REDIS_REPLY_STRING); stype = reply->element[0]->str; pvariant = (tolower(stype[0]) == 'p') ? 1 : 0; @@ -462,14 +460,21 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, /* Unset subscribed flag only when no pipelined pending subscribe. */ if (reply->element[2]->integer == 0 && dictSize(ac->sub.channels) == 0 - && dictSize(ac->sub.patterns) == 0) + && dictSize(ac->sub.patterns) == 0) { c->flags &= ~REDIS_SUBSCRIBED; + + /* Move ongoing regular command callbacks. */ + redisCallback cb; + while (__redisShiftCallback(&ac->sub.replies,&cb) == REDIS_OK) { + __redisPushCallback(&ac->replies,&cb); + } + } } } hi_sdsfree(sname); } else { - /* Shift callback for invalid commands. */ - __redisShiftCallback(&ac->sub.invalid,dstcb); + /* Shift callback for pending command in subscribed context. */ + __redisShiftCallback(&ac->sub.replies,dstcb); } return REDIS_OK; oom: @@ -497,13 +502,12 @@ static int redisIsSubscribeReply(redisReply *reply) { len = reply->element[0]->len - off; return !strncasecmp(str, "subscribe", len) || - !strncasecmp(str, "message", len); - + !strncasecmp(str, "message", len) || + !strncasecmp(str, "unsubscribe", len); } void redisProcessCallbacks(redisAsyncContext *ac) { redisContext *c = &(ac->c); - redisCallback cb = {NULL, NULL, 0, NULL}; void *reply = NULL; int status; @@ -516,17 +520,14 @@ void redisProcessCallbacks(redisAsyncContext *ac) { __redisAsyncDisconnect(ac); return; } - - /* If monitor mode, repush callback */ - if(c->flags & REDIS_MONITORING) { - __redisPushCallback(&ac->replies,&cb); - } - /* When the connection is not being disconnected, simply stop * trying to get replies and wait for the next loop tick. */ break; } + /* Keep track of push message support for subscribe handling */ + if (redisIsPushReply(reply)) c->flags |= REDIS_SUPPORTS_PUSH; + /* Send any non-subscribe related PUSH messages to our PUSH handler * while allowing subscribe related PUSH messages to pass through. * This allows existing code to be backward compatible and work in @@ -539,6 +540,7 @@ void redisProcessCallbacks(redisAsyncContext *ac) { /* Even if the context is subscribed, pending regular * callbacks will get a reply before pub/sub messages arrive. */ + redisCallback cb = {NULL, NULL, 0, NULL}; if (__redisShiftCallback(&ac->replies,&cb) != REDIS_OK) { /* * A spontaneous reply in a not-subscribed context can be the error @@ -562,15 +564,17 @@ void redisProcessCallbacks(redisAsyncContext *ac) { __redisAsyncDisconnect(ac); return; } - /* No more regular callbacks and no errors, the context *must* be subscribed or monitoring. */ - assert((c->flags & REDIS_SUBSCRIBED || c->flags & REDIS_MONITORING)); - if(c->flags & REDIS_SUBSCRIBED) + /* No more regular callbacks and no errors, the context *must* be subscribed. */ + assert(c->flags & REDIS_SUBSCRIBED); + if (c->flags & REDIS_SUBSCRIBED) __redisGetSubscribeCallback(ac,reply,&cb); } if (cb.fn != NULL) { __redisRunCallback(ac,&cb,reply); - c->reader->fn->freeObject(reply); + if (!(c->flags & REDIS_NO_AUTO_FREE_REPLIES)){ + c->reader->fn->freeObject(reply); + } /* Proceed with free'ing when redisAsyncFree() was called. */ if (c->flags & REDIS_FREEING) { @@ -584,6 +588,11 @@ void redisProcessCallbacks(redisAsyncContext *ac) { * doesn't know what the server will spit out over the wire. */ c->reader->fn->freeObject(reply); } + + /* If in monitor mode, repush the callback */ + if (c->flags & REDIS_MONITORING) { + __redisPushCallback(&ac->replies,&cb); + } } /* Disconnect when there was an error reading the reply */ @@ -605,7 +614,8 @@ static int __redisAsyncHandleConnect(redisAsyncContext *ac) { if (redisCheckConnectDone(c, &completed) == REDIS_ERR) { /* Error! */ - redisCheckSocketError(c); + if (redisCheckSocketError(c) == REDIS_ERR) + __redisAsyncCopyError(ac); __redisAsyncHandleConnectFailure(ac); return REDIS_ERR; } else if (completed == 1) { @@ -691,13 +701,22 @@ void redisAsyncHandleTimeout(redisAsyncContext *ac) { redisContext *c = &(ac->c); redisCallback cb; - if ((c->flags & REDIS_CONNECTED) && ac->replies.head == NULL) { - /* Nothing to do - just an idle timeout */ - return; + if ((c->flags & REDIS_CONNECTED)) { + if (ac->replies.head == NULL && ac->sub.replies.head == NULL) { + /* Nothing to do - just an idle timeout */ + return; + } + + if (!ac->c.command_timeout || + (!ac->c.command_timeout->tv_sec && !ac->c.command_timeout->tv_usec)) { + /* A belated connect timeout arriving, ignore */ + return; + } } if (!c->err) { __redisSetError(c, REDIS_ERR_TIMEOUT, "Timeout"); + __redisAsyncCopyError(ac); } if (!(c->flags & REDIS_CONNECTED) && ac->onConnect) { @@ -796,17 +815,19 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void /* (P)UNSUBSCRIBE does not have its own response: every channel or * pattern that is unsubscribed will receive a message. This means we * should not append a callback function for this command. */ - } else if(strncasecmp(cstr,"monitor\r\n",9) == 0) { - /* Set monitor flag and push callback */ - c->flags |= REDIS_MONITORING; - __redisPushCallback(&ac->replies,&cb); + } else if (strncasecmp(cstr,"monitor\r\n",9) == 0) { + /* Set monitor flag and push callback */ + c->flags |= REDIS_MONITORING; + if (__redisPushCallback(&ac->replies,&cb) != REDIS_OK) + goto oom; } else { - if (c->flags & REDIS_SUBSCRIBED) - /* This will likely result in an error reply, but it needs to be - * received and passed to the callback. */ - __redisPushCallback(&ac->sub.invalid,&cb); - else - __redisPushCallback(&ac->replies,&cb); + if (c->flags & REDIS_SUBSCRIBED) { + if (__redisPushCallback(&ac->sub.replies,&cb) != REDIS_OK) + goto oom; + } else { + if (__redisPushCallback(&ac->replies,&cb) != REDIS_OK) + goto oom; + } } __redisAppendCommand(c,cmd,len); @@ -817,6 +838,7 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void return REDIS_OK; oom: __redisSetError(&(ac->c), REDIS_ERR_OOM, "Out of memory"); + __redisAsyncCopyError(ac); return REDIS_ERR; } @@ -846,7 +868,7 @@ int redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata int redisAsyncCommandArgv(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, int argc, const char **argv, const size_t *argvlen) { hisds cmd; - int len; + long long len; int status; len = redisFormatSdsCommandArgv(&cmd,argc,argv,argvlen); if (len < 0) |