summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKristján Valur Jónsson <sweskman@gmail.com>2022-05-08 11:55:55 +0000
committerGitHub <noreply@github.com>2022-05-08 14:55:55 +0300
commit963843b4379eced5ab20d0682f24438962e9eb0f (patch)
tree6c3e50ef97a174bb4b4be5957506362f4ba02bed
parent5c99e27459a047cd1334e1b87fb0623ac2c881db (diff)
downloadredis-py-963843b4379eced5ab20d0682f24438962e9eb0f.tar.gz
Add unittest for PubSub.connect() (#2167)
* Add unittest for PubSub reconnect * fix linting
-rw-r--r--tests/test_asyncio/test_pubsub.py82
1 files changed, 82 insertions, 0 deletions
diff --git a/tests/test_asyncio/test_pubsub.py b/tests/test_asyncio/test_pubsub.py
index f71ec7e..4037040 100644
--- a/tests/test_asyncio/test_pubsub.py
+++ b/tests/test_asyncio/test_pubsub.py
@@ -1,4 +1,5 @@
import asyncio
+import functools
import sys
from typing import Optional
@@ -20,6 +21,18 @@ from .compat import mock
pytestmark = pytest.mark.asyncio(forbid_global_loop=True)
+def with_timeout(t):
+ def wrapper(corofunc):
+ @functools.wraps(corofunc)
+ async def run(*args, **kwargs):
+ async with async_timeout.timeout(t):
+ return await corofunc(*args, **kwargs)
+
+ return run
+
+ return wrapper
+
+
async def wait_for_message(pubsub, timeout=0.1, ignore_subscribe_messages=False):
now = asyncio.get_event_loop().time()
timeout = now + timeout
@@ -604,6 +617,75 @@ class TestPubSubTimeouts:
@pytest.mark.onlynoncluster
+class TestPubSubReconnect:
+ # @pytest.mark.xfail
+ @with_timeout(2)
+ async def test_reconnect_listen(self, r: redis.Redis):
+ """
+ Test that a loop processing PubSub messages can survive
+ a disconnect, by issuing a connect() call.
+ """
+ messages = asyncio.Queue()
+ pubsub = r.pubsub()
+ interrupt = False
+
+ async def loop():
+ # must make sure the task exits
+ async with async_timeout.timeout(2):
+ nonlocal interrupt
+ await pubsub.subscribe("foo")
+ while True:
+ # print("loop")
+ try:
+ try:
+ await pubsub.connect()
+ await loop_step()
+ # print("succ")
+ except redis.ConnectionError:
+ await asyncio.sleep(0.1)
+ except asyncio.CancelledError:
+ # we use a cancel to interrupt the "listen"
+ # when we perform a disconnect
+ # print("cancel", interrupt)
+ if interrupt:
+ interrupt = False
+ else:
+ raise
+
+ async def loop_step():
+ # get a single message via listen()
+ async for message in pubsub.listen():
+ await messages.put(message)
+ break
+
+ task = asyncio.get_event_loop().create_task(loop())
+ # get the initial connect message
+ async with async_timeout.timeout(1):
+ message = await messages.get()
+ assert message == {
+ "channel": b"foo",
+ "data": 1,
+ "pattern": None,
+ "type": "subscribe",
+ }
+ # now, disconnect the connection.
+ await pubsub.connection.disconnect()
+ interrupt = True
+ task.cancel() # interrupt the listen call
+ # await another auto-connect message
+ message = await messages.get()
+ assert message == {
+ "channel": b"foo",
+ "data": 1,
+ "pattern": None,
+ "type": "subscribe",
+ }
+ task.cancel()
+ with pytest.raises(asyncio.CancelledError):
+ await task
+
+
+@pytest.mark.onlynoncluster
class TestPubSubRun:
async def _subscribe(self, p, *args, **kwargs):
await p.subscribe(*args, **kwargs)