summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Reitz <me@kennethreitz.org>2018-03-15 17:07:59 -0400
committerKenneth Reitz <me@kennethreitz.org>2018-03-15 17:07:59 -0400
commit86ab1ce9168d180b3e523dcdbd98bcb6c600bef3 (patch)
tree02a9b6501ba630aacb5a2d97e503863151ed7a33
parentd956dc1d7050aa42bb5a1d070f41b613daab9290 (diff)
downloadpython-requests-86ab1ce9168d180b3e523dcdbd98bcb6c600bef3.tar.gz
session fixture
Signed-off-by: Kenneth Reitz <me@kennethreitz.org>
-rw-r--r--tests/test_requests.py257
1 files changed, 112 insertions, 145 deletions
diff --git a/tests/test_requests.py b/tests/test_requests.py
index 4606ef5c..03ac3307 100644
--- a/tests/test_requests.py
+++ b/tests/test_requests.py
@@ -79,13 +79,16 @@ try:
except AttributeError:
HAS_PYOPENSSL = False
+@pytest.fixture
+def s(request, *args, **kwargs):
+ return requests.Session()
+
class TestRequests:
def test_entry_points(self):
- requests.session
- requests.session().get
- requests.session().head
+ requests.Session().get
+ requests.Session().head
requests.get
requests.head
requests.put
@@ -163,15 +166,14 @@ class TestRequests:
request = requests.Request('GET', url, params={"a": "b"}).prepare()
assert request.url == expected
- def test_params_original_order_is_preserved_by_default(self):
+ def test_params_original_order_is_preserved_by_default(self, s):
param_ordered_dict = collections.OrderedDict(
(('z', 1), ('a', 1), ('k', 1), ('d', 1))
)
- session = requests.Session()
request = requests.Request(
'GET', 'http://example.com/', params=param_ordered_dict
)
- prep = session.prepare_request(request)
+ prep = s.prepare_request(request)
assert prep.url == 'http://example.com/?z=1&a=1&k=1&d=1'
@@ -196,8 +198,7 @@ class TestRequests:
@pytest.mark.parametrize(
'scheme', ('http://', 'HTTP://', 'hTTp://', 'HttP://')
)
- def test_mixed_case_scheme_acceptable(self, httpbin, scheme):
- s = requests.Session()
+ def test_mixed_case_scheme_acceptable(self, s, httpbin, scheme):
s.proxies = getproxies()
parts = urlparse(httpbin('get'))
url = scheme + parts.netloc + parts.path
@@ -205,9 +206,8 @@ class TestRequests:
r = s.send(r.prepare())
assert r.status_code == 200, 'failed for scheme {0}'.format(scheme)
- def test_HTTP_200_OK_GET_ALTERNATIVE(self, httpbin):
+ def test_HTTP_200_OK_GET_ALTERNATIVE(self, s, httpbin):
r = requests.Request('GET', httpbin('get'))
- s = requests.Session()
s.proxies = getproxies()
r = s.send(r.prepare())
assert r.status_code == 200
@@ -254,8 +254,7 @@ class TestRequests:
'Expected redirect to raise TooManyRedirects but it did not'
)
- def test_HTTP_302_TOO_MANY_REDIRECTS_WITH_PARAMS(self, httpbin):
- s = requests.session()
+ def test_HTTP_302_TOO_MANY_REDIRECTS_WITH_PARAMS(self, s, httpbin):
s.max_redirects = 5
try:
s.get(httpbin('relative-redirect', '50'))
@@ -365,44 +364,44 @@ class TestRequests:
assert r.history[0].is_redirect
assert r.request.body is None
- def test_multiple_location_headers(self, httpbin):
+ def test_multiple_location_headers(self, s, httpbin):
headers = [
('Location', 'http://example.com'),
('Location', 'https://example.com/1'),
]
params = '&'.join(['%s=%s' % (k, v) for k, v in headers])
- ses = requests.Session()
+
req = requests.Request('GET', httpbin('response-headers?%s' % params))
- prep = ses.prepare_request(req)
- resp = ses.send(prep)
+ prep = s.prepare_request(req)
+ resp = s.send(prep)
# change response to redirect
resp.status_code = 302
with pytest.raises(InvalidHeader):
# next triggers yield on generator
- next(ses.resolve_redirects(resp, prep))
+ next(s.resolve_redirects(resp, prep))
- def test_header_and_body_removal_on_redirect(self, httpbin):
+ def test_header_and_body_removal_on_redirect(self, s, httpbin):
purged_headers = ('Content-Length', 'Content-Type')
- ses = requests.Session()
+
req = requests.Request('POST', httpbin('post'), data={'test': 'data'})
- prep = ses.prepare_request(req)
- resp = ses.send(prep)
+ prep = s.prepare_request(req)
+ resp = s.send(prep)
# Mimic a redirect response
resp.status_code = 302
resp.headers['location'] = 'get'
# Run request through resolve_redirects
- next_resp = next(ses.resolve_redirects(resp, prep))
+ next_resp = next(s.resolve_redirects(resp, prep))
assert next_resp.request.body is None
for header in purged_headers:
assert header not in next_resp.request.headers
- def test_transfer_enc_removal_on_redirect(self, httpbin):
+ def test_transfer_enc_removal_on_redirect(self, s, httpbin):
purged_headers = ('Transfer-Encoding', 'Content-Type')
- ses = requests.Session()
+
req = requests.Request(
'POST', httpbin('post'), data=(b'x' for x in range(1))
)
- prep = ses.prepare_request(req)
+ prep = s.prepare_request(req)
assert 'Transfer-Encoding' in prep.headers
# Create Response to avoid https://github.com/kevin1024/pytest-httpbin/issues/33
resp = requests.Response()
@@ -413,7 +412,7 @@ class TestRequests:
resp.status_code = 302
resp.headers['location'] = httpbin('get')
# Run request through resolve_redirect
- next_resp = next(ses.resolve_redirects(resp, prep))
+ next_resp = next(s.resolve_redirects(resp, prep))
assert next_resp.request.body is None
for header in purged_headers:
assert header not in next_resp.request.headers
@@ -431,20 +430,17 @@ class TestRequests:
)
assert r.status_code == 200
- def test_set_cookie_on_301(self, httpbin):
- s = requests.session()
+ def test_set_cookie_on_301(self, s, httpbin):
url = httpbin('cookies/set?foo=bar')
s.get(url)
assert s.cookies['foo'] == 'bar'
- def test_cookie_sent_on_redirect(self, httpbin):
- s = requests.session()
+ def test_cookie_sent_on_redirect(self, s, httpbin):
s.get(httpbin('cookies/set?foo=bar'))
r = s.get(httpbin('redirect/1')) # redirects to httpbin('get')
assert 'Cookie' in r.json()['headers']
- def test_cookie_removed_on_expire(self, httpbin):
- s = requests.session()
+ def test_cookie_removed_on_expire(self, s, httpbin):
s.get(httpbin('cookies/set?foo=bar'))
assert s.cookies['foo'] == 'bar'
s.get(
@@ -455,35 +451,30 @@ class TestRequests:
)
assert 'foo' not in s.cookies
- def test_cookie_quote_wrapped(self, httpbin):
- s = requests.session()
+ def test_cookie_quote_wrapped(self, s, httpbin):
s.get(httpbin('cookies/set?foo="bar:baz"'))
assert s.cookies['foo'] == '"bar:baz"'
- def test_cookie_persists_via_api(self, httpbin):
- s = requests.session()
+ def test_cookie_persists_via_api(self, s, httpbin):
r = s.get(httpbin('redirect/1'), cookies={'foo': 'bar'})
assert 'foo' in r.request.headers['Cookie']
assert 'foo' in r.history[0].request.headers['Cookie']
- def test_request_cookie_overrides_session_cookie(self, httpbin):
- s = requests.session()
+ def test_request_cookie_overrides_session_cookie(self, s, httpbin):
s.cookies['foo'] = 'bar'
r = s.get(httpbin('cookies'), cookies={'foo': 'baz'})
assert r.json()['cookies']['foo'] == 'baz'
# Session cookie should not be modified
assert s.cookies['foo'] == 'bar'
- def test_request_cookies_not_persisted(self, httpbin):
- s = requests.session()
+ def test_request_cookies_not_persisted(self, s, httpbin):
s.get(httpbin('cookies'), cookies={'foo': 'baz'})
# Sending a request with cookies should not add cookies to the session
assert not s.cookies
- def test_generic_cookiejar_works(self, httpbin):
+ def test_generic_cookiejar_works(self, s, httpbin):
cj = cookielib.CookieJar()
cookiejar_from_dict({'foo': 'bar'}, cj)
- s = requests.session()
s.cookies = cj
r = s.get(httpbin('cookies'))
# Make sure the cookie was sent
@@ -491,22 +482,20 @@ class TestRequests:
# Make sure the session cj is still the custom one
assert s.cookies is cj
- def test_param_cookiejar_works(self, httpbin):
+ def test_param_cookiejar_works(self, s, httpbin):
cj = cookielib.CookieJar()
cookiejar_from_dict({'foo': 'bar'}, cj)
- s = requests.session()
r = s.get(httpbin('cookies'), cookies=cj)
# Make sure the cookie was sent
assert r.json()['cookies']['foo'] == 'bar'
- def test_cookielib_cookiejar_on_redirect(self, httpbin):
+ def test_cookielib_cookiejar_on_redirect(self, s, httpbin):
"""Tests resolve_redirect doesn't fail when merging cookies
with non-RequestsCookieJar cookiejar.
See GH #3579
"""
cj = cookiejar_from_dict({'foo': 'bar'}, cookielib.CookieJar())
- s = requests.Session()
s.cookies = cookiejar_from_dict({'cookie': 'tasty'})
# Prepare request without using Session
req = requests.Request('GET', httpbin('headers'), cookies=cj)
@@ -532,7 +521,7 @@ class TestRequests:
@pytest.mark.parametrize(
'jar', (requests.cookies.RequestsCookieJar(), cookielib.CookieJar())
)
- def test_custom_cookie_policy_persistence(self, httpbin, jar):
+ def test_custom_cookie_policy_persistence(self, s, httpbin, jar):
"""Verify a custom CookiePolicy is propagated on each session request."""
class TestCookiePolicy(cookielib.DefaultCookiePolicy):
@@ -544,7 +533,6 @@ class TestRequests:
)
# Establish session with jar and set some cookies.
- s = requests.Session()
s.cookies = jar
s.get(httpbin('cookies/set?k1=v1&k2=v2'))
assert len(s.cookies) == 2
@@ -571,26 +559,24 @@ class TestRequests:
assert isinstance(resp.history, list)
assert not isinstance(resp.history, tuple)
- def test_headers_on_session_with_None_are_not_sent(self, httpbin):
+ def test_headers_on_session_with_None_are_not_sent(self, httpbin, s):
"""Do not send headers in Session.headers with None values."""
- ses = requests.Session()
- ses.headers['Accept-Encoding'] = None
+ s.headers['Accept-Encoding'] = None
req = requests.Request('GET', httpbin('get'))
- prep = ses.prepare_request(req)
+ prep = s.prepare_request(req)
assert 'Accept-Encoding' not in prep.headers
- def test_headers_preserve_order(self, httpbin):
+ def test_headers_preserve_order(self, s, httpbin):
"""Preserve order when headers provided as OrderedDict."""
- ses = requests.Session()
- ses.headers = collections.OrderedDict()
- ses.headers['Accept-Encoding'] = 'identity'
- ses.headers['First'] = '1'
- ses.headers['Second'] = '2'
+ s.headers = collections.OrderedDict()
+ s.headers['Accept-Encoding'] = 'identity'
+ s.headers['First'] = '1'
+ s.headers['Second'] = '2'
headers = collections.OrderedDict([('Third', '3'), ('Fourth', '4')])
headers['Fifth'] = '5'
headers['Second'] = '222'
req = requests.Request('GET', httpbin('get'), headers=headers)
- prep = ses.prepare_request(req)
+ prep = s.prepare_request(req)
items = list(prep.headers.items())
assert items[0] == ('Accept-Encoding', 'identity')
assert items[1] == ('First', '1')
@@ -613,14 +599,13 @@ class TestRequests:
r = requests.put(httpbin('put'))
assert r.status_code == 200
- def test_BASICAUTH_TUPLE_HTTP_200_OK_GET(self, httpbin):
+ def test_BASICAUTH_TUPLE_HTTP_200_OK_GET(self, httpbin, s):
auth = ('user', 'pass')
url = httpbin('basic-auth', 'user', 'pass')
r = requests.get(url, auth=auth)
assert r.status_code == 200
r = requests.get(url)
assert r.status_code == 401
- s = requests.session()
s.auth = auth
r = s.get(url)
assert r.status_code == 200
@@ -678,7 +663,7 @@ class TestRequests:
proxies={'http': 'non-resolvable-address'},
)
- def test_basicauth_with_netrc(self, httpbin):
+ def test_basicauth_with_netrc(self, httpbin, s):
auth = ('user', 'pass')
wrong_auth = ('wronguser', 'wrongpass')
url = httpbin('basic-auth', 'user', 'pass')
@@ -695,7 +680,7 @@ class TestRequests:
# Given auth should override and fail.
r = requests.get(url, auth=wrong_auth)
assert r.status_code == 401
- s = requests.session()
+
# Should use netrc and work.
r = s.get(url)
assert r.status_code == 200
@@ -706,14 +691,14 @@ class TestRequests:
finally:
requests.sessions.get_netrc_auth = old_auth
- def test_DIGEST_HTTP_200_OK_GET(self, httpbin):
+ def test_DIGEST_HTTP_200_OK_GET(self, httpbin, s):
auth = HTTPDigestAuth('user', 'pass')
url = httpbin('digest-auth', 'auth', 'user', 'pass')
r = requests.get(url, auth=auth)
assert r.status_code == 200
r = requests.get(url)
assert r.status_code == 401
- s = requests.session()
+
s.auth = HTTPDigestAuth('user', 'pass')
r = s.get(url)
assert r.status_code == 200
@@ -726,10 +711,10 @@ class TestRequests:
r = requests.get(url, auth=auth)
assert r.status_code == 200
- def test_DIGEST_AUTH_SETS_SESSION_COOKIES(self, httpbin):
+ def test_DIGEST_AUTH_SETS_SESSION_COOKIES(self, httpbin, s):
url = httpbin('digest-auth', 'auth', 'user', 'pass')
auth = HTTPDigestAuth('user', 'pass')
- s = requests.Session()
+
s.get(url, auth=auth)
assert s.cookies['fake'] == 'fake_value'
@@ -741,14 +726,13 @@ class TestRequests:
r = requests.get(url, auth=auth, stream=False)
assert r.raw.read() == b''
- def test_DIGESTAUTH_WRONG_HTTP_401_GET(self, httpbin):
+ def test_DIGESTAUTH_WRONG_HTTP_401_GET(self, httpbin, s):
auth = HTTPDigestAuth('user', 'wrongpass')
url = httpbin('digest-auth', 'auth', 'user', 'pass')
r = requests.get(url, auth=auth)
assert r.status_code == 401
r = requests.get(url)
assert r.status_code == 401
- s = requests.session()
s.auth = auth
r = s.get(url)
assert r.status_code == 401
@@ -1025,9 +1009,9 @@ class TestRequests:
)
assert r.status_code == 200
- def test_unicode_method_name_with_request_object(self, httpbin):
+ def test_unicode_method_name_with_request_object(self, httpbin, s):
files = {'file': open(__file__, 'rb')}
- s = requests.Session()
+
req = requests.Request(u('POST'), httpbin('post'), files=files)
prep = s.prepare_request(req)
assert isinstance(prep.method, builtin_str)
@@ -1035,8 +1019,7 @@ class TestRequests:
resp = s.send(prep)
assert resp.status_code == 200
- def test_non_prepared_request_error(self):
- s = requests.Session()
+ def test_non_prepared_request_error(self, s):
req = requests.Request(u('POST'), '/')
with pytest.raises(ValueError) as e:
s.send(req)
@@ -1058,37 +1041,36 @@ class TestRequests:
assert r.status_code == 200
assert b"text/py-content-type" in r.request.body
- def test_hook_receives_request_arguments(self, httpbin):
+ def test_hook_receives_request_arguments(self, httpbin, s):
def hook(resp, **kwargs):
assert resp is not None
assert kwargs != {}
- s = requests.Session()
r = requests.Request('GET', httpbin(), hooks={'response': hook})
prep = s.prepare_request(r)
s.send(prep)
- def test_session_hooks_are_used_with_no_request_hooks(self, httpbin):
+ def test_session_hooks_are_used_with_no_request_hooks(self, httpbin, s):
hook = lambda x, *args, **kwargs: x
- s = requests.Session()
+
s.hooks['response'].append(hook)
r = requests.Request('GET', httpbin())
prep = s.prepare_request(r)
assert prep.hooks['response'] != []
assert prep.hooks['response'] == [hook]
- def test_session_hooks_are_overridden_by_request_hooks(self, httpbin):
+ def test_session_hooks_are_overridden_by_request_hooks(self, httpbin, s):
hook1 = lambda x, *args, **kwargs: x
hook2 = lambda x, *args, **kwargs: x
assert hook1 is not hook2
- s = requests.Session()
+
s.hooks['response'].append(hook2)
r = requests.Request('GET', httpbin(), hooks={'response': [hook1]})
prep = s.prepare_request(r)
assert prep.hooks['response'] == [hook1]
- def test_prepared_request_hook(self, httpbin):
+ def test_prepared_request_hook(self, httpbin, s):
def hook(resp, **kwargs):
resp.headers['hook-working'] = 'True'
@@ -1096,12 +1078,11 @@ class TestRequests:
req = requests.Request('GET', httpbin(), hooks={'response': hook})
prep = req.prepare()
- s = requests.Session()
s.proxies = getproxies()
resp = s.send(prep)
assert resp.headers['hook-working']
- def test_prepared_from_session(self, httpbin):
+ def test_prepared_from_session(self, httpbin, s):
class DummyAuth(requests.auth.AuthBase):
@@ -1111,7 +1092,7 @@ class TestRequests:
req = requests.Request('GET', httpbin('headers'))
assert not req.auth
- s = requests.Session()
+
s.auth = DummyAuth()
prep = s.prepare_request(req)
resp = s.send(prep)
@@ -1479,7 +1460,7 @@ class TestRequests:
line.encode('utf-8') for line in expected_delimiter
]
- def test_prepared_request_is_pickleable(self, httpbin):
+ def test_prepared_request_is_pickleable(self, httpbin, s):
p = requests.Request('GET', httpbin('get')).prepare()
# Verify PreparedRequest can be pickled and unpickled
r = pickle.loads(pickle.dumps(p))
@@ -1487,11 +1468,11 @@ class TestRequests:
assert r.headers == p.headers
assert r.body == p.body
# Verify unpickled PreparedRequest sends properly
- s = requests.Session()
+
resp = s.send(r)
assert resp.status_code == 200
- def test_prepared_request_with_file_is_pickleable(self, httpbin):
+ def test_prepared_request_with_file_is_pickleable(self, httpbin, s):
files = {'file': open(__file__, 'rb')}
r = requests.Request('POST', httpbin('post'), files=files)
p = r.prepare()
@@ -1501,11 +1482,11 @@ class TestRequests:
assert r.headers == p.headers
assert r.body == p.body
# Verify unpickled PreparedRequest sends properly
- s = requests.Session()
+
resp = s.send(r)
assert resp.status_code == 200
- def test_prepared_request_with_hook_is_pickleable(self, httpbin):
+ def test_prepared_request_with_hook_is_pickleable(self, httpbin, s):
r = requests.Request('GET', httpbin('get'), hooks=default_hooks())
p = r.prepare()
# Verify PreparedRequest can be pickled
@@ -1515,7 +1496,7 @@ class TestRequests:
assert r.body == p.body
assert r.hooks == p.hooks
# Verify unpickled PreparedRequest sends properly
- s = requests.Session()
+
resp = s.send(r)
assert resp.status_code == 200
@@ -1534,17 +1515,17 @@ class TestRequests:
assert str(error) == 'message'
assert error.response == response
- def test_session_pickling(self, httpbin):
+ def test_session_pickling(self, httpbin, s):
r = requests.Request('GET', httpbin('get'))
- s = requests.Session()
+
s = pickle.loads(pickle.dumps(s))
s.proxies = getproxies()
r = s.send(r.prepare())
assert r.status_code == 200
- def test_fixes_1329(self, httpbin):
+ def test_fixes_1329(self, httpbin, s):
"""Ensure that header updates are done case-insensitively."""
- s = requests.Session()
+
s.headers.update({'ACCEPT': 'BOGUS'})
s.headers.update({'accept': 'application/json'})
r = s.get(httpbin('get'))
@@ -1560,8 +1541,8 @@ class TestRequests:
assert r.status_code == 200
assert r.url.lower() == url.lower()
- def test_transport_adapter_ordering(self):
- s = requests.Session()
+ def test_transport_adapter_ordering(self, s):
+
order = ['https://', 'http://']
assert order == list(s.adapters)
s.mount('http://git', HTTPAdapter())
@@ -1598,15 +1579,13 @@ class TestRequests:
assert 'http://' in s2.adapters
assert 'https://' in s2.adapters
- def test_header_remove_is_case_insensitive(self, httpbin):
+ def test_header_remove_is_case_insensitive(self, httpbin, s):
# From issue #1321
- s = requests.Session()
s.headers['foo'] = 'bar'
r = s.get(httpbin('get'), headers={'FOO': None})
assert 'foo' not in r.request.headers
- def test_params_are_merged_case_sensitive(self, httpbin):
- s = requests.Session()
+ def test_params_are_merged_case_sensitive(self, httpbin, s):
s.params['foo'] = 'bar'
r = s.get(httpbin('get'), params={'FOO': 'bar'})
assert r.json()['args'] == {'foo': 'bar', 'FOO': 'bar'}
@@ -1736,8 +1715,8 @@ class TestRequests:
h2 = r.request.headers['Authorization']
assert h1 == h2
- def test_manual_redirect_with_partial_body_read(self, httpbin):
- s = requests.Session()
+ def test_manual_redirect_with_partial_body_read(self, httpbin, s):
+
req = requests.Request('GET', httpbin('redirect/2')).prepare()
r1 = s.send(req, allow_redirects=False, stream=True)
assert r1.is_redirect
@@ -1754,18 +1733,16 @@ class TestRequests:
r3 = next(rg)
assert not r3.is_redirect
- def test_prepare_body_position_non_stream(self):
+ def test_prepare_body_position_non_stream(self, s):
data = b'the data'
- s = requests.Session()
prep = requests.Request(
'GET', 'http://example.com', data=data
).prepare(
)
assert prep._body_position is None
- def test_rewind_body(self):
+ def test_rewind_body(self, s):
data = io.BytesIO(b'the data')
- s = requests.Session()
prep = requests.Request(
'GET', 'http://example.com', data=data
).prepare(
@@ -1778,9 +1755,8 @@ class TestRequests:
requests.utils.rewind_body(prep)
assert prep.body.read() == b'the data'
- def test_rewind_partially_read_body(self):
+ def test_rewind_partially_read_body(self, s):
data = io.BytesIO(b'the data')
- s = requests.Session()
data.read(4) # read some data
prep = requests.Request(
'GET', 'http://example.com', data=data
@@ -1794,7 +1770,7 @@ class TestRequests:
requests.utils.rewind_body(prep)
assert prep.body.read() == b'data'
- def test_rewind_body_no_seek(self):
+ def test_rewind_body_no_seek(self, s):
class BadFileObj:
@@ -1808,7 +1784,7 @@ class TestRequests:
return
data = BadFileObj('the data')
- s = requests.Session()
+
prep = requests.Request(
'GET', 'http://example.com', data=data
).prepare(
@@ -1818,7 +1794,7 @@ class TestRequests:
requests.utils.rewind_body(prep)
assert 'Unable to rewind request body' in str(e)
- def test_rewind_body_failed_seek(self):
+ def test_rewind_body_failed_seek(self, s):
class BadFileObj:
@@ -1835,7 +1811,6 @@ class TestRequests:
return
data = BadFileObj('the data')
- s = requests.Session()
prep = requests.Request(
'GET', 'http://example.com', data=data
).prepare(
@@ -1845,7 +1820,7 @@ class TestRequests:
requests.utils.rewind_body(prep)
assert 'error occurred when rewinding request body' in str(e)
- def test_rewind_body_failed_tell(self):
+ def test_rewind_body_failed_tell(self, s):
class BadFileObj:
@@ -1859,7 +1834,6 @@ class TestRequests:
return
data = BadFileObj('the data')
- s = requests.Session()
prep = requests.Request(
'GET', 'http://example.com', data=data
).prepare(
@@ -1883,8 +1857,7 @@ class TestRequests:
adapter.build_response = build_response
- def test_redirect_with_wrong_gzipped_header(self, httpbin):
- s = requests.Session()
+ def test_redirect_with_wrong_gzipped_header(self, httpbin, s):
url = httpbin('redirect/1')
self._patch_adapter_gzipped_redirect(s, url)
s.get(url)
@@ -1945,8 +1918,7 @@ class TestRequests:
assert isinstance(response, requests.Response)
assert response.raw.closed
- def test_unconsumed_session_response_closes_connection(self, httpbin):
- s = requests.session()
+ def test_unconsumed_session_response_closes_connection(self, httpbin, s):
with contextlib.closing(
s.get(httpbin('stream/4'), stream=True)
) as response:
@@ -1962,10 +1934,9 @@ class TestRequests:
next(r.iter_lines())
assert len(list(r.iter_lines())) == 3
- def test_environment_comes_after_session(self, httpbin):
+ def test_environment_comes_after_session(self, httpbin, s):
"""The Session arguments should come before environment arguments."""
# We get proxies from the environment and verify from the argument.
- s = requests.Session()
a = SendRecordingAdapter()
s.mount('http://', a)
# Both of these arguments are safe fallbacks that we can easily
@@ -1994,25 +1965,24 @@ class TestRequests:
proxies['http']
@pytest.fixture(autouse=True)
- def test_merge_environment_settings_verify(self, monkeypatch):
+ def test_merge_environment_settings_verify(self, monkeypatch, s):
"""Assert CA environment settings are merged as expected when missing"""
- session = requests.Session()
monkeypatch.delenv('CURL_CA_BUNDLE', raising=False)
monkeypatch.delenv('REQUESTS_CA_BUNDLE', raising=False)
- assert session.trust_env is True
- assert session.verify is True
+ assert s.trust_env is True
+ assert s.verify is True
assert 'REQUESTS_CA_BUNDLE' not in os.environ
assert 'CURL_CA_BUNDLE' not in os.environ
- merged_settings = session.merge_environment_settings(
+ merged_settings = s.merge_environment_settings(
'http://example.com', {}, False, True, None
)
assert merged_settings['verify'] is True
- def test_session_close_proxy_clear(self, mocker):
+ def test_session_close_proxy_clear(self, mocker, s):
proxies = {'one': mocker.Mock(), 'two': mocker.Mock()}
- session = requests.Session()
- mocker.patch.dict(session.adapters['http://'].proxy_manager, proxies)
- session.close()
+
+ mocker.patch.dict(s.adapters['http://'].proxy_manager, proxies)
+ s.close()
proxies['one'].clear.assert_called_once_with()
proxies['two'].clear.assert_called_once_with()
@@ -2046,17 +2016,16 @@ class TestRequests:
resp.close()
assert resp.raw.closed
- def test_updating_ca_cert(self, httpbin_secure):
+ def test_updating_ca_cert(self, httpbin_secure, s):
"""Assert that requests use the latest configured CA certificates."""
- session = requests.session()
- session.verify = pytest_httpbin.certs.where()
- session.get(httpbin_secure('/'))
- session.verify = True
+ s.verify = pytest_httpbin.certs.where()
+ s.get(httpbin_secure('/'))
+ s.verify = True
with pytest.raises(requests.exceptions.SSLError) as e:
- session.get(httpbin_secure('/'))
+ s.get(httpbin_secure('/'))
assert 'certificate verify failed' in str(e)
- def test_updating_client_cert(self, httpbin_secure):
+ def test_updating_client_cert(self, httpbin_secure, s):
"""Assert that requests use the latest configured client certificates."""
ca_file = pytest_httpbin.certs.where()
cert_dir = os.path.dirname(ca_file)
@@ -2065,10 +2034,10 @@ class TestRequests:
# be the server's certificate and key.
cert = os.path.join(cert_dir, 'cert.pem')
key = os.path.join(cert_dir, 'key.pem')
- session = requests.session()
- session.verify = ca_file
- resp = session.get(httpbin_secure('/'))
- resp_with_cert = session.get(httpbin_secure('/'), cert=(cert, key))
+
+ s.verify = ca_file
+ resp = s.get(httpbin_secure('/'))
+ resp_with_cert = s.get(httpbin_secure('/'), cert=(cert, key))
assert resp_with_cert.raw._pool.cert_file == cert
assert resp_with_cert.raw._pool.key_file == key
assert resp.raw._pool is not resp_with_cert.raw._pool
@@ -2591,8 +2560,8 @@ def test_requests_are_updated_each_time(httpbin):
('all_proxy', 'https://example.com', 'socks5://proxy.com:9876'),
],
)
-def test_proxy_env_vars_override_default(var, url, proxy):
- session = requests.Session()
+def test_proxy_env_vars_override_default(var, url, proxy, s):
+
prep = PreparedRequest()
prep.prepare(method='GET', url=url)
kwargs = {var: proxy}
@@ -2663,10 +2632,9 @@ def test_prepare_requires_a_request_method():
prepped.prepare()
-def test_urllib3_retries(httpbin):
+def test_urllib3_retries(httpbin, s):
from urllib3.util import Retry
- s = requests.Session()
s.mount(
'http://',
HTTPAdapter(max_retries=Retry(total=2, status_forcelist=[500])),
@@ -2675,8 +2643,7 @@ def test_urllib3_retries(httpbin):
s.get(httpbin('status/500'))
-def test_urllib3_pool_connection_closed(httpbin):
- s = requests.Session()
+def test_urllib3_pool_connection_closed(httpbin, s):
s.mount('http://', HTTPAdapter(pool_connections=0, pool_maxsize=0))
try:
s.get(httpbin('status/200'))