1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
import sys
import logging
import socket
import cgi
import pytest
from webob.request import Request
from webob.response import Response
from webob.compat import url_open
from webob.compat import bytes_
from webob.compat import reraise
from webob.compat import Queue
from webob.compat import Empty
log = logging.getLogger(__name__)
@pytest.mark.usefixtures("serve")
def test_request_reading(serve):
"""
Test actual request/response cycle in the presence of Request.copy()
and other methods that can potentially hang.
"""
with serve(_test_app_req_reading) as server:
for key in _test_ops_req_read:
resp = url_open(server.url + key, timeout=3)
assert resp.read() == b"ok"
def _test_app_req_reading(env, sr):
req = Request(env)
log.debug('starting test operation: %s', req.path_info)
test_op = _test_ops_req_read[req.path_info]
test_op(req)
log.debug('done')
r = Response("ok")
return r(env, sr)
_test_ops_req_read = {
'/copy': lambda req: req.copy(),
'/read-all': lambda req: req.body_file.read(),
'/read-0': lambda req: req.body_file.read(0),
'/make-seekable': lambda req: req.make_body_seekable()
}
@pytest.mark.usefixtures("serve")
def test_interrupted_request(serve):
with serve(_test_app_req_interrupt) as server:
for path in _test_ops_req_interrupt:
_send_interrupted_req(server, path)
try:
res = _global_res.get(timeout=1)
except Empty:
raise AssertionError("Error during test %s", path)
if res is not None:
print("Error during test:", path)
reraise(res)
_global_res = Queue()
def _test_app_req_interrupt(env, sr):
target_cl = 100000
try:
req = Request(env)
cl = req.content_length
if cl != target_cl:
raise AssertionError(
'request.content_length is %s instead of %s' % (cl, target_cl))
op = _test_ops_req_interrupt[req.path_info]
log.info("Running test: %s", req.path_info)
with pytest.raises(IOError):
op(req)
except:
_global_res.put(sys.exc_info())
else:
_global_res.put(None)
sr('200 OK', [])
return []
def _req_int_cgi(req):
assert req.body_file.read(0) == b''
cgi.FieldStorage(
fp=req.body_file,
environ=req.environ,
)
def _req_int_readline(req):
try:
assert req.body_file.readline() == b'a=b\n'
except IOError:
# too early to detect disconnect
raise AssertionError("False disconnect alert")
req.body_file.readline()
_test_ops_req_interrupt = {
'/copy': lambda req: req.copy(),
'/read-body': lambda req: req.body,
'/read-post': lambda req: req.POST,
'/read-all': lambda req: req.body_file.read(),
'/read-too-much': lambda req: req.body_file.read(1 << 22),
'/readline': _req_int_readline,
'/readlines': lambda req: req.body_file.readlines(),
'/read-cgi': _req_int_cgi,
'/make-seekable': lambda req: req.make_body_seekable()
}
def _send_interrupted_req(server, path='/'):
sock = socket.socket()
sock.connect(('localhost', server.server_port))
f = sock.makefile('wb')
f.write(bytes_(_interrupted_req % path))
f.flush()
f.close()
sock.close()
_interrupted_req = (
"POST %s HTTP/1.0\r\n"
"content-type: application/x-www-form-urlencoded\r\n"
"content-length: 100000\r\n"
"\r\n"
)
_interrupted_req += 'a=b\nz=' + 'x' * 10000
|