summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChayim I. Kirshen <c@kirshen.com>2023-04-04 12:35:08 +0300
committerChayim I. Kirshen <c@kirshen.com>2023-04-04 12:35:08 +0300
commit519c1ae2b2f20cd51b1c091709ae5a3ea5fad072 (patch)
treeb4f6c8c83f9000cf3f0054dc8ab2ade9d3575330
parente48233d1cfecb8efe43f197d8a4acc566c31fb70 (diff)
downloadredis-py-4.3.7.tar.gz
close loop4.3.7
-rw-r--r--redis/asyncio/client.py10
-rw-r--r--tests/test_asyncio/test_cwe_404.py26
2 files changed, 12 insertions, 24 deletions
diff --git a/redis/asyncio/client.py b/redis/asyncio/client.py
index f5d8b01..8628ff7 100644
--- a/redis/asyncio/client.py
+++ b/redis/asyncio/client.py
@@ -1185,13 +1185,9 @@ class Pipeline(Redis): # lgtm [py/init-calls-subclass]
command_name, self.shard_hint
)
self.connection = conn
- try:
- return await asyncio.shield(
- self._try_send_command_parse_response(conn, *args, **options)
- )
- except asyncio.CancelledError:
- await conn.disconnect()
- raise
+ return await asyncio.shield(
+ self._try_send_command_parse_response(conn, *args, **options)
+ )
def pipeline_execute_command(self, *args, **options):
"""
diff --git a/tests/test_asyncio/test_cwe_404.py b/tests/test_asyncio/test_cwe_404.py
index 19ec9dd..9d7e23f 100644
--- a/tests/test_asyncio/test_cwe_404.py
+++ b/tests/test_asyncio/test_cwe_404.py
@@ -26,12 +26,11 @@ class DelayProxy:
self.delay = delay
async def start(self):
+ asyncio.get_event_loop()
self.server = await asyncio.start_server(self.handle, *self.addr)
- self.ROUTINE = asyncio.ensure_future(self.server.serve_forever())
async def handle(self, reader, writer):
# establish connection to redis
- print("new connection")
redis_reader, redis_writer = await asyncio.open_connection(*self.redis_addr)
pipe1 = asyncio.ensure_future(
pipe(reader, redis_writer, self.delay, "to redis:")
@@ -39,23 +38,16 @@ class DelayProxy:
pipe2 = asyncio.ensure_future(
pipe(redis_reader, writer, self.delay, "from redis:")
)
- try:
- await pipe1
- finally:
- pipe2.cancel()
- await pipe2
+ asyncio.gather(pipe1, pipe2)
async def stop(self):
- # clean up enough so that we can reuse the looper
- self.ROUTINE.cancel()
- loop = self.server.get_loop()
- await loop.shutdown_asyncgens()
+ self.server.close()
+ await self.server.wait_closed()
@pytest.mark.asyncio
@pytest.mark.onlynoncluster
-@pytest.mark.parametrize("delay", argvalues=[0.05, 0.5, 1, 2])
-async def test_standalone(delay):
+async def test_standalone(delay=0.05):
# create a tcp socket proxy that relays data to Redis and back,
# inserting 0.1 seconds of delay
@@ -91,12 +83,12 @@ async def test_standalone(delay):
@pytest.mark.asyncio
@pytest.mark.onlynoncluster
-@pytest.mark.parametrize("delay", argvalues=[0.05, 0.5, 1, 2])
-async def test_standalone_pipeline(delay):
+async def test_standalone_pipeline(delay=0.05):
dp = DelayProxy(
addr=("localhost", 5380), redis_addr=("localhost", 6379), delay=delay * 2
)
await dp.start()
+
async with Redis(host="localhost", port=5380) as r:
await r.set("foo", "foo")
await r.set("bar", "bar")
@@ -115,9 +107,9 @@ async def test_standalone_pipeline(delay):
pipe.get("bar")
pipe.ping()
pipe.get("foo")
- pipe.reset()
+ await pipe.reset()
- assert await pipe.execute() is None
+ assert await pipe.execute() in [None, []]
# validating that the pipeline can be used as it could previously
pipe.get("bar")