summaryrefslogtreecommitdiff
path: root/tests/test_in_wsgiref.py
blob: 868b7c76c0412b2e2f52ed503d5dc3b24cd1901a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import sys
import logging
import socket
import cgi

import pytest

from webob.request import Request
from webob.response import Response
from webob.compat import url_open
from webob.compat import bytes_
from webob.compat import reraise
from webob.compat import Queue
from webob.compat import Empty

log = logging.getLogger(__name__)

@pytest.mark.usefixtures("serve")
def test_request_reading(serve):
    """
        Test actual request/response cycle in the presence of Request.copy()
        and other methods that can potentially hang.
    """
    with serve(_test_app_req_reading) as server:
        for key in _test_ops_req_read:
            resp = url_open(server.url + key, timeout=3)
            assert resp.read() == b"ok"

def _test_app_req_reading(env, sr):
    req = Request(env)
    log.debug('starting test operation: %s', req.path_info)
    test_op = _test_ops_req_read[req.path_info]
    test_op(req)
    log.debug('done')
    r = Response("ok")
    return r(env, sr)


_test_ops_req_read = {
    '/copy': lambda req: req.copy(),
    '/read-all': lambda req: req.body_file.read(),
    '/read-0': lambda req: req.body_file.read(0),
    '/make-seekable': lambda req: req.make_body_seekable()
}

@pytest.mark.usefixtures("serve")
def test_interrupted_request(serve):
    with serve(_test_app_req_interrupt) as server:
        for path in _test_ops_req_interrupt:
            _send_interrupted_req(server, path)
            try:
                res = _global_res.get(timeout=1)
            except Empty:
                raise AssertionError("Error during test %s", path)
            if res is not None:
                print("Error during test:", path)
                reraise(res)


_global_res = Queue()


def _test_app_req_interrupt(env, sr):
    target_cl = 100000
    try:
        req = Request(env)
        cl = req.content_length
        if cl != target_cl:
            raise AssertionError(
                'request.content_length is %s instead of %s' % (cl, target_cl))
        op = _test_ops_req_interrupt[req.path_info]
        log.info("Running test: %s", req.path_info)
        with pytest.raises(IOError):
            op(req)
    except:
        _global_res.put(sys.exc_info())
    else:
        _global_res.put(None)
        sr('200 OK', [])
        return []

def _req_int_cgi(req):
    assert req.body_file.read(0) == b''
    cgi.FieldStorage(
        fp=req.body_file,
        environ=req.environ,
    )

def _req_int_readline(req):
    try:
        assert req.body_file.readline() == b'a=b\n'
    except IOError:
        # too early to detect disconnect
        raise AssertionError("False disconnect alert")
    req.body_file.readline()


_test_ops_req_interrupt = {
    '/copy': lambda req: req.copy(),
    '/read-body': lambda req: req.body,
    '/read-post': lambda req: req.POST,
    '/read-all': lambda req: req.body_file.read(),
    '/read-too-much': lambda req: req.body_file.read(1 << 22),
    '/readline': _req_int_readline,
    '/readlines': lambda req: req.body_file.readlines(),
    '/read-cgi': _req_int_cgi,
    '/make-seekable': lambda req: req.make_body_seekable()
}


def _send_interrupted_req(server, path='/'):
    sock = socket.socket()
    sock.connect(('localhost', server.server_port))
    f = sock.makefile('wb')
    f.write(bytes_(_interrupted_req % path))
    f.flush()
    f.close()
    sock.close()


_interrupted_req = (
    "POST %s HTTP/1.0\r\n"
    "content-type: application/x-www-form-urlencoded\r\n"
    "content-length: 100000\r\n"
    "\r\n"
)
_interrupted_req += 'a=b\nz=' + 'x' * 10000