diff options
author | Chayim I. Kirshen <c@kirshen.com> | 2023-04-04 12:35:08 +0300 |
---|---|---|
committer | Chayim I. Kirshen <c@kirshen.com> | 2023-04-04 12:35:08 +0300 |
commit | 519c1ae2b2f20cd51b1c091709ae5a3ea5fad072 (patch) | |
tree | b4f6c8c83f9000cf3f0054dc8ab2ade9d3575330 | |
parent | e48233d1cfecb8efe43f197d8a4acc566c31fb70 (diff) | |
download | redis-py-4.3.7.tar.gz |
close loop4.3.7
-rw-r--r-- | redis/asyncio/client.py | 10 | ||||
-rw-r--r-- | tests/test_asyncio/test_cwe_404.py | 26 |
2 files changed, 12 insertions, 24 deletions
diff --git a/redis/asyncio/client.py b/redis/asyncio/client.py index f5d8b01..8628ff7 100644 --- a/redis/asyncio/client.py +++ b/redis/asyncio/client.py @@ -1185,13 +1185,9 @@ class Pipeline(Redis): # lgtm [py/init-calls-subclass] command_name, self.shard_hint ) self.connection = conn - try: - return await asyncio.shield( - self._try_send_command_parse_response(conn, *args, **options) - ) - except asyncio.CancelledError: - await conn.disconnect() - raise + return await asyncio.shield( + self._try_send_command_parse_response(conn, *args, **options) + ) def pipeline_execute_command(self, *args, **options): """ diff --git a/tests/test_asyncio/test_cwe_404.py b/tests/test_asyncio/test_cwe_404.py index 19ec9dd..9d7e23f 100644 --- a/tests/test_asyncio/test_cwe_404.py +++ b/tests/test_asyncio/test_cwe_404.py @@ -26,12 +26,11 @@ class DelayProxy: self.delay = delay async def start(self): + asyncio.get_event_loop() self.server = await asyncio.start_server(self.handle, *self.addr) - self.ROUTINE = asyncio.ensure_future(self.server.serve_forever()) async def handle(self, reader, writer): # establish connection to redis - print("new connection") redis_reader, redis_writer = await asyncio.open_connection(*self.redis_addr) pipe1 = asyncio.ensure_future( pipe(reader, redis_writer, self.delay, "to redis:") @@ -39,23 +38,16 @@ class DelayProxy: pipe2 = asyncio.ensure_future( pipe(redis_reader, writer, self.delay, "from redis:") ) - try: - await pipe1 - finally: - pipe2.cancel() - await pipe2 + asyncio.gather(pipe1, pipe2) async def stop(self): - # clean up enough so that we can reuse the looper - self.ROUTINE.cancel() - loop = self.server.get_loop() - await loop.shutdown_asyncgens() + self.server.close() + await self.server.wait_closed() @pytest.mark.asyncio @pytest.mark.onlynoncluster -@pytest.mark.parametrize("delay", argvalues=[0.05, 0.5, 1, 2]) -async def test_standalone(delay): +async def test_standalone(delay=0.05): # create a tcp socket proxy that relays data to Redis and back, # inserting 0.1 seconds of delay @@ -91,12 +83,12 @@ async def test_standalone(delay): @pytest.mark.asyncio @pytest.mark.onlynoncluster -@pytest.mark.parametrize("delay", argvalues=[0.05, 0.5, 1, 2]) -async def test_standalone_pipeline(delay): +async def test_standalone_pipeline(delay=0.05): dp = DelayProxy( addr=("localhost", 5380), redis_addr=("localhost", 6379), delay=delay * 2 ) await dp.start() + async with Redis(host="localhost", port=5380) as r: await r.set("foo", "foo") await r.set("bar", "bar") @@ -115,9 +107,9 @@ async def test_standalone_pipeline(delay): pipe.get("bar") pipe.ping() pipe.get("foo") - pipe.reset() + await pipe.reset() - assert await pipe.execute() is None + assert await pipe.execute() in [None, []] # validating that the pipeline can be used as it could previously pipe.get("bar") |