summaryrefslogtreecommitdiff
path: root/tests/test_asyncio/test_cwe_404.py
blob: 0455bc95fae30bc69e8707e75162005e04bb30d4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import asyncio
import sys

import pytest

from redis.asyncio import Redis
from redis.asyncio.cluster import RedisCluster


async def pipe(
    reader: asyncio.StreamReader, writer: asyncio.StreamWriter, delay: float, name=""
):
    while True:
        data = await reader.read(1000)
        if not data:
            break
        await asyncio.sleep(delay)
        writer.write(data)
        await writer.drain()


class DelayProxy:
    def __init__(self, addr, redis_addr, delay: float):
        self.addr = addr
        self.redis_addr = redis_addr
        self.delay = delay

    async def start(self):
        self.server = await asyncio.start_server(self.handle, *self.addr)
        self.ROUTINE = asyncio.create_task(self.server.serve_forever())

    async def handle(self, reader, writer):
        # establish connection to redis
        redis_reader, redis_writer = await asyncio.open_connection(*self.redis_addr)
        pipe1 = asyncio.create_task(pipe(reader, redis_writer, self.delay, "to redis:"))
        pipe2 = asyncio.create_task(
            pipe(redis_reader, writer, self.delay, "from redis:")
        )
        await asyncio.gather(pipe1, pipe2)

    async def stop(self):
        # clean up enough so that we can reuse the looper
        self.ROUTINE.cancel()
        loop = self.server.get_loop()
        await loop.shutdown_asyncgens()


@pytest.mark.onlynoncluster
@pytest.mark.parametrize("delay", argvalues=[0.05, 0.5, 1, 2])
async def test_standalone(delay):

    # create a tcp socket proxy that relays data to Redis and back,
    # inserting 0.1 seconds of delay
    dp = DelayProxy(
        addr=("localhost", 5380), redis_addr=("localhost", 6379), delay=delay * 2
    )
    await dp.start()

    # note that we connect to proxy, rather than to Redis directly
    # there's generally a bug with single_connection_client in 4.4 which is fixed in 4.5
    async with Redis(host="localhost", port=5380, single_connection_client=False) as r:

        await r.set("foo", "foo")
        await r.set("bar", "bar")

        t = asyncio.create_task(r.get("foo"))
        await asyncio.sleep(delay)
        t.cancel()
        try:
            await t
            sys.stderr.write("try again, we did not cancel the task in time\n")
        except asyncio.CancelledError:
            sys.stderr.write(
                "canceled task, connection is left open with unread response\n"
            )

        assert await r.get("bar") == b"bar"
        assert await r.ping()
        assert await r.get("foo") == b"foo"

    await dp.stop()


@pytest.mark.onlynoncluster
@pytest.mark.parametrize("delay", argvalues=[0.05, 0.5, 1, 2])
async def test_standalone_pipeline(delay):
    dp = DelayProxy(
        addr=("localhost", 5380), redis_addr=("localhost", 6379), delay=delay * 2
    )
    await dp.start()
    async with Redis(host="localhost", port=5380) as r:
        await r.set("foo", "foo")
        await r.set("bar", "bar")

        pipe = r.pipeline()

        pipe2 = r.pipeline()
        pipe2.get("bar")
        pipe2.ping()
        pipe2.get("foo")

        t = asyncio.create_task(pipe.get("foo").execute())
        await asyncio.sleep(delay)
        t.cancel()

        pipe.get("bar")
        pipe.ping()
        pipe.get("foo")
        pipe.reset()

        assert await pipe.execute() is None

        # validating that the pipeline can be used as it could previously
        pipe.get("bar")
        pipe.ping()
        pipe.get("foo")
        assert await pipe.execute() == [b"bar", True, b"foo"]
        assert await pipe2.execute() == [b"bar", True, b"foo"]

    await dp.stop()


@pytest.mark.onlycluster
async def test_cluster(request):

    dp = DelayProxy(addr=("localhost", 5381), redis_addr=("localhost", 6372), delay=0.1)
    await dp.start()

    r = RedisCluster.from_url("redis://localhost:5381")
    await r.initialize()
    await r.set("foo", "foo")
    await r.set("bar", "bar")

    t = asyncio.create_task(r.get("foo"))
    await asyncio.sleep(0.050)
    t.cancel()
    try:
        await t
    except asyncio.CancelledError:
        pytest.fail("connection is left open with unread response")

    assert await r.get("bar") == b"bar"
    assert await r.ping()
    assert await r.get("foo") == b"foo"

    await dp.stop()