summaryrefslogtreecommitdiff
path: root/tests/test_asyncio/test_cwe_404.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_asyncio/test_cwe_404.py')
-rw-r--r--tests/test_asyncio/test_cwe_404.py319
1 files changed, 212 insertions, 107 deletions
diff --git a/tests/test_asyncio/test_cwe_404.py b/tests/test_asyncio/test_cwe_404.py
index dc62df6..d3a0666 100644
--- a/tests/test_asyncio/test_cwe_404.py
+++ b/tests/test_asyncio/test_cwe_404.py
@@ -1,147 +1,252 @@
import asyncio
-import sys
+import contextlib
import pytest
from redis.asyncio import Redis
from redis.asyncio.cluster import RedisCluster
-
-
-async def pipe(
- reader: asyncio.StreamReader, writer: asyncio.StreamWriter, delay: float, name=""
-):
- while True:
- data = await reader.read(1000)
- if not data:
- break
- await asyncio.sleep(delay)
- writer.write(data)
- await writer.drain()
+from redis.asyncio.connection import async_timeout
class DelayProxy:
- def __init__(self, addr, redis_addr, delay: float):
+ def __init__(self, addr, redis_addr, delay: float = 0.0):
self.addr = addr
self.redis_addr = redis_addr
self.delay = delay
+ self.send_event = asyncio.Event()
+ self.server = None
+ self.task = None
+
+ async def __aenter__(self):
+ await self.start()
+ return self
+
+ async def __aexit__(self, *args):
+ await self.stop()
async def start(self):
- self.server = await asyncio.start_server(self.handle, *self.addr)
- self.ROUTINE = asyncio.create_task(self.server.serve_forever())
+ # test that we can connect to redis
+ async with async_timeout(2):
+ _, redis_writer = await asyncio.open_connection(*self.redis_addr)
+ redis_writer.close()
+ self.server = await asyncio.start_server(
+ self.handle, *self.addr, reuse_address=True
+ )
+ self.task = asyncio.create_task(self.server.serve_forever())
+
+ @contextlib.contextmanager
+ def set_delay(self, delay: float = 0.0):
+ """
+ Allow to override the delay for parts of tests which aren't time dependent,
+ to speed up execution.
+ """
+ old_delay = self.delay
+ self.delay = delay
+ try:
+ yield
+ finally:
+ self.delay = old_delay
async def handle(self, reader, writer):
# establish connection to redis
redis_reader, redis_writer = await asyncio.open_connection(*self.redis_addr)
- pipe1 = asyncio.create_task(pipe(reader, redis_writer, self.delay, "to redis:"))
- pipe2 = asyncio.create_task(
- pipe(redis_reader, writer, self.delay, "from redis:")
- )
- await asyncio.gather(pipe1, pipe2)
+ try:
+ pipe1 = asyncio.create_task(
+ self.pipe(reader, redis_writer, "to redis:", self.send_event)
+ )
+ pipe2 = asyncio.create_task(self.pipe(redis_reader, writer, "from redis:"))
+ await asyncio.gather(pipe1, pipe2)
+ finally:
+ redis_writer.close()
async def stop(self):
# clean up enough so that we can reuse the looper
- self.ROUTINE.cancel()
+ self.task.cancel()
+ try:
+ await self.task
+ except asyncio.CancelledError:
+ pass
loop = self.server.get_loop()
await loop.shutdown_asyncgens()
+ async def pipe(
+ self,
+ reader: asyncio.StreamReader,
+ writer: asyncio.StreamWriter,
+ name="",
+ event: asyncio.Event = None,
+ ):
+ while True:
+ data = await reader.read(1000)
+ if not data:
+ break
+ # print(f"{name} read {len(data)} delay {self.delay}")
+ if event:
+ event.set()
+ await asyncio.sleep(self.delay)
+ writer.write(data)
+ await writer.drain()
+
@pytest.mark.onlynoncluster
@pytest.mark.parametrize("delay", argvalues=[0.05, 0.5, 1, 2])
-async def test_standalone(delay):
+async def test_standalone(delay, master_host):
# create a tcp socket proxy that relays data to Redis and back,
# inserting 0.1 seconds of delay
- dp = DelayProxy(
- addr=("localhost", 5380), redis_addr=("localhost", 6379), delay=delay * 2
- )
- await dp.start()
-
- for b in [True, False]:
- # note that we connect to proxy, rather than to Redis directly
- async with Redis(host="localhost", port=5380, single_connection_client=b) as r:
-
- await r.set("foo", "foo")
- await r.set("bar", "bar")
-
- t = asyncio.create_task(r.get("foo"))
- await asyncio.sleep(delay)
- t.cancel()
- try:
- await t
- sys.stderr.write("try again, we did not cancel the task in time\n")
- except asyncio.CancelledError:
- sys.stderr.write(
- "canceled task, connection is left open with unread response\n"
- )
-
- assert await r.get("bar") == b"bar"
- assert await r.ping()
- assert await r.get("foo") == b"foo"
-
- await dp.stop()
-
-
+ async with DelayProxy(addr=("127.0.0.1", 5380), redis_addr=master_host) as dp:
+
+ for b in [True, False]:
+ # note that we connect to proxy, rather than to Redis directly
+ async with Redis(
+ host="127.0.0.1", port=5380, single_connection_client=b
+ ) as r:
+
+ await r.set("foo", "foo")
+ await r.set("bar", "bar")
+
+ async def op(r):
+ with dp.set_delay(delay * 2):
+ return await r.get(
+ "foo"
+ ) # <-- this is the operation we want to cancel
+
+ dp.send_event.clear()
+ t = asyncio.create_task(op(r))
+ # Wait until the task has sent, and then some, to make sure it has
+ # settled on the read.
+ await dp.send_event.wait()
+ await asyncio.sleep(0.01) # a little extra time for prudence
+ t.cancel()
+ with pytest.raises(asyncio.CancelledError):
+ await t
+
+ # make sure that our previous request, cancelled while waiting for
+ # a repsponse, didn't leave the connection open andin a bad state
+ assert await r.get("bar") == b"bar"
+ assert await r.ping()
+ assert await r.get("foo") == b"foo"
+
+
+@pytest.mark.xfail(reason="cancel does not cause disconnect")
@pytest.mark.onlynoncluster
@pytest.mark.parametrize("delay", argvalues=[0.05, 0.5, 1, 2])
-async def test_standalone_pipeline(delay):
- dp = DelayProxy(
- addr=("localhost", 5380), redis_addr=("localhost", 6379), delay=delay * 2
- )
- await dp.start()
- for b in [True, False]:
- async with Redis(host="localhost", port=5380, single_connection_client=b) as r:
- await r.set("foo", "foo")
- await r.set("bar", "bar")
-
- pipe = r.pipeline()
-
- pipe2 = r.pipeline()
- pipe2.get("bar")
- pipe2.ping()
- pipe2.get("foo")
-
- t = asyncio.create_task(pipe.get("foo").execute())
- await asyncio.sleep(delay)
- t.cancel()
-
- pipe.get("bar")
- pipe.ping()
- pipe.get("foo")
- pipe.reset()
-
- assert await pipe.execute() is None
-
- # validating that the pipeline can be used as it could previously
- pipe.get("bar")
- pipe.ping()
- pipe.get("foo")
- assert await pipe.execute() == [b"bar", True, b"foo"]
- assert await pipe2.execute() == [b"bar", True, b"foo"]
-
- await dp.stop()
+async def test_standalone_pipeline(delay, master_host):
+ async with DelayProxy(addr=("127.0.0.1", 5380), redis_addr=master_host) as dp:
+ for b in [True, False]:
+ async with Redis(
+ host="127.0.0.1", port=5380, single_connection_client=b
+ ) as r:
+ await r.set("foo", "foo")
+ await r.set("bar", "bar")
+
+ pipe = r.pipeline()
+
+ pipe2 = r.pipeline()
+ pipe2.get("bar")
+ pipe2.ping()
+ pipe2.get("foo")
+
+ async def op(pipe):
+ with dp.set_delay(delay * 2):
+ return await pipe.get(
+ "foo"
+ ).execute() # <-- this is the operation we want to cancel
+
+ dp.send_event.clear()
+ t = asyncio.create_task(op(pipe))
+ # wait until task has settled on the read
+ await dp.send_event.wait()
+ await asyncio.sleep(0.01)
+ t.cancel()
+ with pytest.raises(asyncio.CancelledError):
+ await t
+
+ # we have now cancelled the pieline in the middle of a request,
+ # make sure that the connection is still usable
+ pipe.get("bar")
+ pipe.ping()
+ pipe.get("foo")
+ await pipe.reset()
+
+ # check that the pipeline is empty after reset
+ assert await pipe.execute() == []
+
+ # validating that the pipeline can be used as it could previously
+ pipe.get("bar")
+ pipe.ping()
+ pipe.get("foo")
+ assert await pipe.execute() == [b"bar", True, b"foo"]
+ assert await pipe2.execute() == [b"bar", True, b"foo"]
@pytest.mark.onlycluster
-async def test_cluster(request):
+async def test_cluster(master_host):
+
+ delay = 0.1
+ cluster_port = 6372
+ remap_base = 7372
+ n_nodes = 6
+ hostname, _ = master_host
+
+ def remap(address):
+ host, port = address
+ return host, remap_base + port - cluster_port
+
+ proxies = []
+ for i in range(n_nodes):
+ port = cluster_port + i
+ remapped = remap_base + i
+ forward_addr = hostname, port
+ proxy = DelayProxy(addr=("127.0.0.1", remapped), redis_addr=forward_addr)
+ proxies.append(proxy)
+
+ def all_clear():
+ for p in proxies:
+ p.send_event.clear()
+
+ async def wait_for_send():
+ asyncio.wait(
+ [p.send_event.wait() for p in proxies], return_when=asyncio.FIRST_COMPLETED
+ )
- dp = DelayProxy(addr=("localhost", 5381), redis_addr=("localhost", 6372), delay=0.1)
- await dp.start()
+ @contextlib.contextmanager
+ def set_delay(delay: float):
+ with contextlib.ExitStack() as stack:
+ for p in proxies:
+ stack.enter_context(p.set_delay(delay))
+ yield
+
+ async with contextlib.AsyncExitStack() as stack:
+ for p in proxies:
+ await stack.enter_async_context(p)
+
+ with contextlib.closing(
+ RedisCluster.from_url(
+ f"redis://127.0.0.1:{remap_base}", address_remap=remap
+ )
+ ) as r:
+ await r.initialize()
+ await r.set("foo", "foo")
+ await r.set("bar", "bar")
- r = RedisCluster.from_url("redis://localhost:5381")
- await r.initialize()
- await r.set("foo", "foo")
- await r.set("bar", "bar")
+ async def op(r):
+ with set_delay(delay):
+ return await r.get("foo")
- t = asyncio.create_task(r.get("foo"))
- await asyncio.sleep(0.050)
- t.cancel()
- try:
- await t
- except asyncio.CancelledError:
- pytest.fail("connection is left open with unread response")
+ all_clear()
+ t = asyncio.create_task(op(r))
+ # Wait for whichever DelayProxy gets the request first
+ await wait_for_send()
+ await asyncio.sleep(0.01)
+ t.cancel()
+ with pytest.raises(asyncio.CancelledError):
+ await t
- assert await r.get("bar") == b"bar"
- assert await r.ping()
- assert await r.get("foo") == b"foo"
+ # try a number of requests to excercise all the connections
+ async def doit():
+ assert await r.get("bar") == b"bar"
+ assert await r.ping()
+ assert await r.get("foo") == b"foo"
- await dp.stop()
+ await asyncio.gather(*[doit() for _ in range(10)])