diff options
author | Kristján Valur Jónsson <sweskman@gmail.com> | 2022-05-08 11:55:55 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-05-08 14:55:55 +0300 |
commit | 963843b4379eced5ab20d0682f24438962e9eb0f (patch) | |
tree | 6c3e50ef97a174bb4b4be5957506362f4ba02bed | |
parent | 5c99e27459a047cd1334e1b87fb0623ac2c881db (diff) | |
download | redis-py-963843b4379eced5ab20d0682f24438962e9eb0f.tar.gz |
Add unittest for PubSub.connect() (#2167)
* Add unittest for PubSub reconnect
* fix linting
-rw-r--r-- | tests/test_asyncio/test_pubsub.py | 82 |
1 files changed, 82 insertions, 0 deletions
diff --git a/tests/test_asyncio/test_pubsub.py b/tests/test_asyncio/test_pubsub.py index f71ec7e..4037040 100644 --- a/tests/test_asyncio/test_pubsub.py +++ b/tests/test_asyncio/test_pubsub.py @@ -1,4 +1,5 @@ import asyncio +import functools import sys from typing import Optional @@ -20,6 +21,18 @@ from .compat import mock pytestmark = pytest.mark.asyncio(forbid_global_loop=True) +def with_timeout(t): + def wrapper(corofunc): + @functools.wraps(corofunc) + async def run(*args, **kwargs): + async with async_timeout.timeout(t): + return await corofunc(*args, **kwargs) + + return run + + return wrapper + + async def wait_for_message(pubsub, timeout=0.1, ignore_subscribe_messages=False): now = asyncio.get_event_loop().time() timeout = now + timeout @@ -604,6 +617,75 @@ class TestPubSubTimeouts: @pytest.mark.onlynoncluster +class TestPubSubReconnect: + # @pytest.mark.xfail + @with_timeout(2) + async def test_reconnect_listen(self, r: redis.Redis): + """ + Test that a loop processing PubSub messages can survive + a disconnect, by issuing a connect() call. + """ + messages = asyncio.Queue() + pubsub = r.pubsub() + interrupt = False + + async def loop(): + # must make sure the task exits + async with async_timeout.timeout(2): + nonlocal interrupt + await pubsub.subscribe("foo") + while True: + # print("loop") + try: + try: + await pubsub.connect() + await loop_step() + # print("succ") + except redis.ConnectionError: + await asyncio.sleep(0.1) + except asyncio.CancelledError: + # we use a cancel to interrupt the "listen" + # when we perform a disconnect + # print("cancel", interrupt) + if interrupt: + interrupt = False + else: + raise + + async def loop_step(): + # get a single message via listen() + async for message in pubsub.listen(): + await messages.put(message) + break + + task = asyncio.get_event_loop().create_task(loop()) + # get the initial connect message + async with async_timeout.timeout(1): + message = await messages.get() + assert message == { + "channel": b"foo", + "data": 1, + "pattern": None, + "type": "subscribe", + } + # now, disconnect the connection. + await pubsub.connection.disconnect() + interrupt = True + task.cancel() # interrupt the listen call + # await another auto-connect message + message = await messages.get() + assert message == { + "channel": b"foo", + "data": 1, + "pattern": None, + "type": "subscribe", + } + task.cancel() + with pytest.raises(asyncio.CancelledError): + await task + + +@pytest.mark.onlynoncluster class TestPubSubRun: async def _subscribe(self, p, *args, **kwargs): await p.subscribe(*args, **kwargs) |