diff options
| author | Kenneth Reitz <me@kennethreitz.org> | 2018-03-15 17:07:59 -0400 |
|---|---|---|
| committer | Kenneth Reitz <me@kennethreitz.org> | 2018-03-15 17:07:59 -0400 |
| commit | 86ab1ce9168d180b3e523dcdbd98bcb6c600bef3 (patch) | |
| tree | 02a9b6501ba630aacb5a2d97e503863151ed7a33 | |
| parent | d956dc1d7050aa42bb5a1d070f41b613daab9290 (diff) | |
| download | python-requests-86ab1ce9168d180b3e523dcdbd98bcb6c600bef3.tar.gz | |
session fixture
Signed-off-by: Kenneth Reitz <me@kennethreitz.org>
| -rw-r--r-- | tests/test_requests.py | 257 |
1 files changed, 112 insertions, 145 deletions
diff --git a/tests/test_requests.py b/tests/test_requests.py index 4606ef5c..03ac3307 100644 --- a/tests/test_requests.py +++ b/tests/test_requests.py @@ -79,13 +79,16 @@ try: except AttributeError: HAS_PYOPENSSL = False +@pytest.fixture +def s(request, *args, **kwargs): + return requests.Session() + class TestRequests: def test_entry_points(self): - requests.session - requests.session().get - requests.session().head + requests.Session().get + requests.Session().head requests.get requests.head requests.put @@ -163,15 +166,14 @@ class TestRequests: request = requests.Request('GET', url, params={"a": "b"}).prepare() assert request.url == expected - def test_params_original_order_is_preserved_by_default(self): + def test_params_original_order_is_preserved_by_default(self, s): param_ordered_dict = collections.OrderedDict( (('z', 1), ('a', 1), ('k', 1), ('d', 1)) ) - session = requests.Session() request = requests.Request( 'GET', 'http://example.com/', params=param_ordered_dict ) - prep = session.prepare_request(request) + prep = s.prepare_request(request) assert prep.url == 'http://example.com/?z=1&a=1&k=1&d=1' @@ -196,8 +198,7 @@ class TestRequests: @pytest.mark.parametrize( 'scheme', ('http://', 'HTTP://', 'hTTp://', 'HttP://') ) - def test_mixed_case_scheme_acceptable(self, httpbin, scheme): - s = requests.Session() + def test_mixed_case_scheme_acceptable(self, s, httpbin, scheme): s.proxies = getproxies() parts = urlparse(httpbin('get')) url = scheme + parts.netloc + parts.path @@ -205,9 +206,8 @@ class TestRequests: r = s.send(r.prepare()) assert r.status_code == 200, 'failed for scheme {0}'.format(scheme) - def test_HTTP_200_OK_GET_ALTERNATIVE(self, httpbin): + def test_HTTP_200_OK_GET_ALTERNATIVE(self, s, httpbin): r = requests.Request('GET', httpbin('get')) - s = requests.Session() s.proxies = getproxies() r = s.send(r.prepare()) assert r.status_code == 200 @@ -254,8 +254,7 @@ class TestRequests: 'Expected redirect to raise TooManyRedirects but it did not' ) - def test_HTTP_302_TOO_MANY_REDIRECTS_WITH_PARAMS(self, httpbin): - s = requests.session() + def test_HTTP_302_TOO_MANY_REDIRECTS_WITH_PARAMS(self, s, httpbin): s.max_redirects = 5 try: s.get(httpbin('relative-redirect', '50')) @@ -365,44 +364,44 @@ class TestRequests: assert r.history[0].is_redirect assert r.request.body is None - def test_multiple_location_headers(self, httpbin): + def test_multiple_location_headers(self, s, httpbin): headers = [ ('Location', 'http://example.com'), ('Location', 'https://example.com/1'), ] params = '&'.join(['%s=%s' % (k, v) for k, v in headers]) - ses = requests.Session() + req = requests.Request('GET', httpbin('response-headers?%s' % params)) - prep = ses.prepare_request(req) - resp = ses.send(prep) + prep = s.prepare_request(req) + resp = s.send(prep) # change response to redirect resp.status_code = 302 with pytest.raises(InvalidHeader): # next triggers yield on generator - next(ses.resolve_redirects(resp, prep)) + next(s.resolve_redirects(resp, prep)) - def test_header_and_body_removal_on_redirect(self, httpbin): + def test_header_and_body_removal_on_redirect(self, s, httpbin): purged_headers = ('Content-Length', 'Content-Type') - ses = requests.Session() + req = requests.Request('POST', httpbin('post'), data={'test': 'data'}) - prep = ses.prepare_request(req) - resp = ses.send(prep) + prep = s.prepare_request(req) + resp = s.send(prep) # Mimic a redirect response resp.status_code = 302 resp.headers['location'] = 'get' # Run request through resolve_redirects - next_resp = next(ses.resolve_redirects(resp, prep)) + next_resp = next(s.resolve_redirects(resp, prep)) assert next_resp.request.body is None for header in purged_headers: assert header not in next_resp.request.headers - def test_transfer_enc_removal_on_redirect(self, httpbin): + def test_transfer_enc_removal_on_redirect(self, s, httpbin): purged_headers = ('Transfer-Encoding', 'Content-Type') - ses = requests.Session() + req = requests.Request( 'POST', httpbin('post'), data=(b'x' for x in range(1)) ) - prep = ses.prepare_request(req) + prep = s.prepare_request(req) assert 'Transfer-Encoding' in prep.headers # Create Response to avoid https://github.com/kevin1024/pytest-httpbin/issues/33 resp = requests.Response() @@ -413,7 +412,7 @@ class TestRequests: resp.status_code = 302 resp.headers['location'] = httpbin('get') # Run request through resolve_redirect - next_resp = next(ses.resolve_redirects(resp, prep)) + next_resp = next(s.resolve_redirects(resp, prep)) assert next_resp.request.body is None for header in purged_headers: assert header not in next_resp.request.headers @@ -431,20 +430,17 @@ class TestRequests: ) assert r.status_code == 200 - def test_set_cookie_on_301(self, httpbin): - s = requests.session() + def test_set_cookie_on_301(self, s, httpbin): url = httpbin('cookies/set?foo=bar') s.get(url) assert s.cookies['foo'] == 'bar' - def test_cookie_sent_on_redirect(self, httpbin): - s = requests.session() + def test_cookie_sent_on_redirect(self, s, httpbin): s.get(httpbin('cookies/set?foo=bar')) r = s.get(httpbin('redirect/1')) # redirects to httpbin('get') assert 'Cookie' in r.json()['headers'] - def test_cookie_removed_on_expire(self, httpbin): - s = requests.session() + def test_cookie_removed_on_expire(self, s, httpbin): s.get(httpbin('cookies/set?foo=bar')) assert s.cookies['foo'] == 'bar' s.get( @@ -455,35 +451,30 @@ class TestRequests: ) assert 'foo' not in s.cookies - def test_cookie_quote_wrapped(self, httpbin): - s = requests.session() + def test_cookie_quote_wrapped(self, s, httpbin): s.get(httpbin('cookies/set?foo="bar:baz"')) assert s.cookies['foo'] == '"bar:baz"' - def test_cookie_persists_via_api(self, httpbin): - s = requests.session() + def test_cookie_persists_via_api(self, s, httpbin): r = s.get(httpbin('redirect/1'), cookies={'foo': 'bar'}) assert 'foo' in r.request.headers['Cookie'] assert 'foo' in r.history[0].request.headers['Cookie'] - def test_request_cookie_overrides_session_cookie(self, httpbin): - s = requests.session() + def test_request_cookie_overrides_session_cookie(self, s, httpbin): s.cookies['foo'] = 'bar' r = s.get(httpbin('cookies'), cookies={'foo': 'baz'}) assert r.json()['cookies']['foo'] == 'baz' # Session cookie should not be modified assert s.cookies['foo'] == 'bar' - def test_request_cookies_not_persisted(self, httpbin): - s = requests.session() + def test_request_cookies_not_persisted(self, s, httpbin): s.get(httpbin('cookies'), cookies={'foo': 'baz'}) # Sending a request with cookies should not add cookies to the session assert not s.cookies - def test_generic_cookiejar_works(self, httpbin): + def test_generic_cookiejar_works(self, s, httpbin): cj = cookielib.CookieJar() cookiejar_from_dict({'foo': 'bar'}, cj) - s = requests.session() s.cookies = cj r = s.get(httpbin('cookies')) # Make sure the cookie was sent @@ -491,22 +482,20 @@ class TestRequests: # Make sure the session cj is still the custom one assert s.cookies is cj - def test_param_cookiejar_works(self, httpbin): + def test_param_cookiejar_works(self, s, httpbin): cj = cookielib.CookieJar() cookiejar_from_dict({'foo': 'bar'}, cj) - s = requests.session() r = s.get(httpbin('cookies'), cookies=cj) # Make sure the cookie was sent assert r.json()['cookies']['foo'] == 'bar' - def test_cookielib_cookiejar_on_redirect(self, httpbin): + def test_cookielib_cookiejar_on_redirect(self, s, httpbin): """Tests resolve_redirect doesn't fail when merging cookies with non-RequestsCookieJar cookiejar. See GH #3579 """ cj = cookiejar_from_dict({'foo': 'bar'}, cookielib.CookieJar()) - s = requests.Session() s.cookies = cookiejar_from_dict({'cookie': 'tasty'}) # Prepare request without using Session req = requests.Request('GET', httpbin('headers'), cookies=cj) @@ -532,7 +521,7 @@ class TestRequests: @pytest.mark.parametrize( 'jar', (requests.cookies.RequestsCookieJar(), cookielib.CookieJar()) ) - def test_custom_cookie_policy_persistence(self, httpbin, jar): + def test_custom_cookie_policy_persistence(self, s, httpbin, jar): """Verify a custom CookiePolicy is propagated on each session request.""" class TestCookiePolicy(cookielib.DefaultCookiePolicy): @@ -544,7 +533,6 @@ class TestRequests: ) # Establish session with jar and set some cookies. - s = requests.Session() s.cookies = jar s.get(httpbin('cookies/set?k1=v1&k2=v2')) assert len(s.cookies) == 2 @@ -571,26 +559,24 @@ class TestRequests: assert isinstance(resp.history, list) assert not isinstance(resp.history, tuple) - def test_headers_on_session_with_None_are_not_sent(self, httpbin): + def test_headers_on_session_with_None_are_not_sent(self, httpbin, s): """Do not send headers in Session.headers with None values.""" - ses = requests.Session() - ses.headers['Accept-Encoding'] = None + s.headers['Accept-Encoding'] = None req = requests.Request('GET', httpbin('get')) - prep = ses.prepare_request(req) + prep = s.prepare_request(req) assert 'Accept-Encoding' not in prep.headers - def test_headers_preserve_order(self, httpbin): + def test_headers_preserve_order(self, s, httpbin): """Preserve order when headers provided as OrderedDict.""" - ses = requests.Session() - ses.headers = collections.OrderedDict() - ses.headers['Accept-Encoding'] = 'identity' - ses.headers['First'] = '1' - ses.headers['Second'] = '2' + s.headers = collections.OrderedDict() + s.headers['Accept-Encoding'] = 'identity' + s.headers['First'] = '1' + s.headers['Second'] = '2' headers = collections.OrderedDict([('Third', '3'), ('Fourth', '4')]) headers['Fifth'] = '5' headers['Second'] = '222' req = requests.Request('GET', httpbin('get'), headers=headers) - prep = ses.prepare_request(req) + prep = s.prepare_request(req) items = list(prep.headers.items()) assert items[0] == ('Accept-Encoding', 'identity') assert items[1] == ('First', '1') @@ -613,14 +599,13 @@ class TestRequests: r = requests.put(httpbin('put')) assert r.status_code == 200 - def test_BASICAUTH_TUPLE_HTTP_200_OK_GET(self, httpbin): + def test_BASICAUTH_TUPLE_HTTP_200_OK_GET(self, httpbin, s): auth = ('user', 'pass') url = httpbin('basic-auth', 'user', 'pass') r = requests.get(url, auth=auth) assert r.status_code == 200 r = requests.get(url) assert r.status_code == 401 - s = requests.session() s.auth = auth r = s.get(url) assert r.status_code == 200 @@ -678,7 +663,7 @@ class TestRequests: proxies={'http': 'non-resolvable-address'}, ) - def test_basicauth_with_netrc(self, httpbin): + def test_basicauth_with_netrc(self, httpbin, s): auth = ('user', 'pass') wrong_auth = ('wronguser', 'wrongpass') url = httpbin('basic-auth', 'user', 'pass') @@ -695,7 +680,7 @@ class TestRequests: # Given auth should override and fail. r = requests.get(url, auth=wrong_auth) assert r.status_code == 401 - s = requests.session() + # Should use netrc and work. r = s.get(url) assert r.status_code == 200 @@ -706,14 +691,14 @@ class TestRequests: finally: requests.sessions.get_netrc_auth = old_auth - def test_DIGEST_HTTP_200_OK_GET(self, httpbin): + def test_DIGEST_HTTP_200_OK_GET(self, httpbin, s): auth = HTTPDigestAuth('user', 'pass') url = httpbin('digest-auth', 'auth', 'user', 'pass') r = requests.get(url, auth=auth) assert r.status_code == 200 r = requests.get(url) assert r.status_code == 401 - s = requests.session() + s.auth = HTTPDigestAuth('user', 'pass') r = s.get(url) assert r.status_code == 200 @@ -726,10 +711,10 @@ class TestRequests: r = requests.get(url, auth=auth) assert r.status_code == 200 - def test_DIGEST_AUTH_SETS_SESSION_COOKIES(self, httpbin): + def test_DIGEST_AUTH_SETS_SESSION_COOKIES(self, httpbin, s): url = httpbin('digest-auth', 'auth', 'user', 'pass') auth = HTTPDigestAuth('user', 'pass') - s = requests.Session() + s.get(url, auth=auth) assert s.cookies['fake'] == 'fake_value' @@ -741,14 +726,13 @@ class TestRequests: r = requests.get(url, auth=auth, stream=False) assert r.raw.read() == b'' - def test_DIGESTAUTH_WRONG_HTTP_401_GET(self, httpbin): + def test_DIGESTAUTH_WRONG_HTTP_401_GET(self, httpbin, s): auth = HTTPDigestAuth('user', 'wrongpass') url = httpbin('digest-auth', 'auth', 'user', 'pass') r = requests.get(url, auth=auth) assert r.status_code == 401 r = requests.get(url) assert r.status_code == 401 - s = requests.session() s.auth = auth r = s.get(url) assert r.status_code == 401 @@ -1025,9 +1009,9 @@ class TestRequests: ) assert r.status_code == 200 - def test_unicode_method_name_with_request_object(self, httpbin): + def test_unicode_method_name_with_request_object(self, httpbin, s): files = {'file': open(__file__, 'rb')} - s = requests.Session() + req = requests.Request(u('POST'), httpbin('post'), files=files) prep = s.prepare_request(req) assert isinstance(prep.method, builtin_str) @@ -1035,8 +1019,7 @@ class TestRequests: resp = s.send(prep) assert resp.status_code == 200 - def test_non_prepared_request_error(self): - s = requests.Session() + def test_non_prepared_request_error(self, s): req = requests.Request(u('POST'), '/') with pytest.raises(ValueError) as e: s.send(req) @@ -1058,37 +1041,36 @@ class TestRequests: assert r.status_code == 200 assert b"text/py-content-type" in r.request.body - def test_hook_receives_request_arguments(self, httpbin): + def test_hook_receives_request_arguments(self, httpbin, s): def hook(resp, **kwargs): assert resp is not None assert kwargs != {} - s = requests.Session() r = requests.Request('GET', httpbin(), hooks={'response': hook}) prep = s.prepare_request(r) s.send(prep) - def test_session_hooks_are_used_with_no_request_hooks(self, httpbin): + def test_session_hooks_are_used_with_no_request_hooks(self, httpbin, s): hook = lambda x, *args, **kwargs: x - s = requests.Session() + s.hooks['response'].append(hook) r = requests.Request('GET', httpbin()) prep = s.prepare_request(r) assert prep.hooks['response'] != [] assert prep.hooks['response'] == [hook] - def test_session_hooks_are_overridden_by_request_hooks(self, httpbin): + def test_session_hooks_are_overridden_by_request_hooks(self, httpbin, s): hook1 = lambda x, *args, **kwargs: x hook2 = lambda x, *args, **kwargs: x assert hook1 is not hook2 - s = requests.Session() + s.hooks['response'].append(hook2) r = requests.Request('GET', httpbin(), hooks={'response': [hook1]}) prep = s.prepare_request(r) assert prep.hooks['response'] == [hook1] - def test_prepared_request_hook(self, httpbin): + def test_prepared_request_hook(self, httpbin, s): def hook(resp, **kwargs): resp.headers['hook-working'] = 'True' @@ -1096,12 +1078,11 @@ class TestRequests: req = requests.Request('GET', httpbin(), hooks={'response': hook}) prep = req.prepare() - s = requests.Session() s.proxies = getproxies() resp = s.send(prep) assert resp.headers['hook-working'] - def test_prepared_from_session(self, httpbin): + def test_prepared_from_session(self, httpbin, s): class DummyAuth(requests.auth.AuthBase): @@ -1111,7 +1092,7 @@ class TestRequests: req = requests.Request('GET', httpbin('headers')) assert not req.auth - s = requests.Session() + s.auth = DummyAuth() prep = s.prepare_request(req) resp = s.send(prep) @@ -1479,7 +1460,7 @@ class TestRequests: line.encode('utf-8') for line in expected_delimiter ] - def test_prepared_request_is_pickleable(self, httpbin): + def test_prepared_request_is_pickleable(self, httpbin, s): p = requests.Request('GET', httpbin('get')).prepare() # Verify PreparedRequest can be pickled and unpickled r = pickle.loads(pickle.dumps(p)) @@ -1487,11 +1468,11 @@ class TestRequests: assert r.headers == p.headers assert r.body == p.body # Verify unpickled PreparedRequest sends properly - s = requests.Session() + resp = s.send(r) assert resp.status_code == 200 - def test_prepared_request_with_file_is_pickleable(self, httpbin): + def test_prepared_request_with_file_is_pickleable(self, httpbin, s): files = {'file': open(__file__, 'rb')} r = requests.Request('POST', httpbin('post'), files=files) p = r.prepare() @@ -1501,11 +1482,11 @@ class TestRequests: assert r.headers == p.headers assert r.body == p.body # Verify unpickled PreparedRequest sends properly - s = requests.Session() + resp = s.send(r) assert resp.status_code == 200 - def test_prepared_request_with_hook_is_pickleable(self, httpbin): + def test_prepared_request_with_hook_is_pickleable(self, httpbin, s): r = requests.Request('GET', httpbin('get'), hooks=default_hooks()) p = r.prepare() # Verify PreparedRequest can be pickled @@ -1515,7 +1496,7 @@ class TestRequests: assert r.body == p.body assert r.hooks == p.hooks # Verify unpickled PreparedRequest sends properly - s = requests.Session() + resp = s.send(r) assert resp.status_code == 200 @@ -1534,17 +1515,17 @@ class TestRequests: assert str(error) == 'message' assert error.response == response - def test_session_pickling(self, httpbin): + def test_session_pickling(self, httpbin, s): r = requests.Request('GET', httpbin('get')) - s = requests.Session() + s = pickle.loads(pickle.dumps(s)) s.proxies = getproxies() r = s.send(r.prepare()) assert r.status_code == 200 - def test_fixes_1329(self, httpbin): + def test_fixes_1329(self, httpbin, s): """Ensure that header updates are done case-insensitively.""" - s = requests.Session() + s.headers.update({'ACCEPT': 'BOGUS'}) s.headers.update({'accept': 'application/json'}) r = s.get(httpbin('get')) @@ -1560,8 +1541,8 @@ class TestRequests: assert r.status_code == 200 assert r.url.lower() == url.lower() - def test_transport_adapter_ordering(self): - s = requests.Session() + def test_transport_adapter_ordering(self, s): + order = ['https://', 'http://'] assert order == list(s.adapters) s.mount('http://git', HTTPAdapter()) @@ -1598,15 +1579,13 @@ class TestRequests: assert 'http://' in s2.adapters assert 'https://' in s2.adapters - def test_header_remove_is_case_insensitive(self, httpbin): + def test_header_remove_is_case_insensitive(self, httpbin, s): # From issue #1321 - s = requests.Session() s.headers['foo'] = 'bar' r = s.get(httpbin('get'), headers={'FOO': None}) assert 'foo' not in r.request.headers - def test_params_are_merged_case_sensitive(self, httpbin): - s = requests.Session() + def test_params_are_merged_case_sensitive(self, httpbin, s): s.params['foo'] = 'bar' r = s.get(httpbin('get'), params={'FOO': 'bar'}) assert r.json()['args'] == {'foo': 'bar', 'FOO': 'bar'} @@ -1736,8 +1715,8 @@ class TestRequests: h2 = r.request.headers['Authorization'] assert h1 == h2 - def test_manual_redirect_with_partial_body_read(self, httpbin): - s = requests.Session() + def test_manual_redirect_with_partial_body_read(self, httpbin, s): + req = requests.Request('GET', httpbin('redirect/2')).prepare() r1 = s.send(req, allow_redirects=False, stream=True) assert r1.is_redirect @@ -1754,18 +1733,16 @@ class TestRequests: r3 = next(rg) assert not r3.is_redirect - def test_prepare_body_position_non_stream(self): + def test_prepare_body_position_non_stream(self, s): data = b'the data' - s = requests.Session() prep = requests.Request( 'GET', 'http://example.com', data=data ).prepare( ) assert prep._body_position is None - def test_rewind_body(self): + def test_rewind_body(self, s): data = io.BytesIO(b'the data') - s = requests.Session() prep = requests.Request( 'GET', 'http://example.com', data=data ).prepare( @@ -1778,9 +1755,8 @@ class TestRequests: requests.utils.rewind_body(prep) assert prep.body.read() == b'the data' - def test_rewind_partially_read_body(self): + def test_rewind_partially_read_body(self, s): data = io.BytesIO(b'the data') - s = requests.Session() data.read(4) # read some data prep = requests.Request( 'GET', 'http://example.com', data=data @@ -1794,7 +1770,7 @@ class TestRequests: requests.utils.rewind_body(prep) assert prep.body.read() == b'data' - def test_rewind_body_no_seek(self): + def test_rewind_body_no_seek(self, s): class BadFileObj: @@ -1808,7 +1784,7 @@ class TestRequests: return data = BadFileObj('the data') - s = requests.Session() + prep = requests.Request( 'GET', 'http://example.com', data=data ).prepare( @@ -1818,7 +1794,7 @@ class TestRequests: requests.utils.rewind_body(prep) assert 'Unable to rewind request body' in str(e) - def test_rewind_body_failed_seek(self): + def test_rewind_body_failed_seek(self, s): class BadFileObj: @@ -1835,7 +1811,6 @@ class TestRequests: return data = BadFileObj('the data') - s = requests.Session() prep = requests.Request( 'GET', 'http://example.com', data=data ).prepare( @@ -1845,7 +1820,7 @@ class TestRequests: requests.utils.rewind_body(prep) assert 'error occurred when rewinding request body' in str(e) - def test_rewind_body_failed_tell(self): + def test_rewind_body_failed_tell(self, s): class BadFileObj: @@ -1859,7 +1834,6 @@ class TestRequests: return data = BadFileObj('the data') - s = requests.Session() prep = requests.Request( 'GET', 'http://example.com', data=data ).prepare( @@ -1883,8 +1857,7 @@ class TestRequests: adapter.build_response = build_response - def test_redirect_with_wrong_gzipped_header(self, httpbin): - s = requests.Session() + def test_redirect_with_wrong_gzipped_header(self, httpbin, s): url = httpbin('redirect/1') self._patch_adapter_gzipped_redirect(s, url) s.get(url) @@ -1945,8 +1918,7 @@ class TestRequests: assert isinstance(response, requests.Response) assert response.raw.closed - def test_unconsumed_session_response_closes_connection(self, httpbin): - s = requests.session() + def test_unconsumed_session_response_closes_connection(self, httpbin, s): with contextlib.closing( s.get(httpbin('stream/4'), stream=True) ) as response: @@ -1962,10 +1934,9 @@ class TestRequests: next(r.iter_lines()) assert len(list(r.iter_lines())) == 3 - def test_environment_comes_after_session(self, httpbin): + def test_environment_comes_after_session(self, httpbin, s): """The Session arguments should come before environment arguments.""" # We get proxies from the environment and verify from the argument. - s = requests.Session() a = SendRecordingAdapter() s.mount('http://', a) # Both of these arguments are safe fallbacks that we can easily @@ -1994,25 +1965,24 @@ class TestRequests: proxies['http'] @pytest.fixture(autouse=True) - def test_merge_environment_settings_verify(self, monkeypatch): + def test_merge_environment_settings_verify(self, monkeypatch, s): """Assert CA environment settings are merged as expected when missing""" - session = requests.Session() monkeypatch.delenv('CURL_CA_BUNDLE', raising=False) monkeypatch.delenv('REQUESTS_CA_BUNDLE', raising=False) - assert session.trust_env is True - assert session.verify is True + assert s.trust_env is True + assert s.verify is True assert 'REQUESTS_CA_BUNDLE' not in os.environ assert 'CURL_CA_BUNDLE' not in os.environ - merged_settings = session.merge_environment_settings( + merged_settings = s.merge_environment_settings( 'http://example.com', {}, False, True, None ) assert merged_settings['verify'] is True - def test_session_close_proxy_clear(self, mocker): + def test_session_close_proxy_clear(self, mocker, s): proxies = {'one': mocker.Mock(), 'two': mocker.Mock()} - session = requests.Session() - mocker.patch.dict(session.adapters['http://'].proxy_manager, proxies) - session.close() + + mocker.patch.dict(s.adapters['http://'].proxy_manager, proxies) + s.close() proxies['one'].clear.assert_called_once_with() proxies['two'].clear.assert_called_once_with() @@ -2046,17 +2016,16 @@ class TestRequests: resp.close() assert resp.raw.closed - def test_updating_ca_cert(self, httpbin_secure): + def test_updating_ca_cert(self, httpbin_secure, s): """Assert that requests use the latest configured CA certificates.""" - session = requests.session() - session.verify = pytest_httpbin.certs.where() - session.get(httpbin_secure('/')) - session.verify = True + s.verify = pytest_httpbin.certs.where() + s.get(httpbin_secure('/')) + s.verify = True with pytest.raises(requests.exceptions.SSLError) as e: - session.get(httpbin_secure('/')) + s.get(httpbin_secure('/')) assert 'certificate verify failed' in str(e) - def test_updating_client_cert(self, httpbin_secure): + def test_updating_client_cert(self, httpbin_secure, s): """Assert that requests use the latest configured client certificates.""" ca_file = pytest_httpbin.certs.where() cert_dir = os.path.dirname(ca_file) @@ -2065,10 +2034,10 @@ class TestRequests: # be the server's certificate and key. cert = os.path.join(cert_dir, 'cert.pem') key = os.path.join(cert_dir, 'key.pem') - session = requests.session() - session.verify = ca_file - resp = session.get(httpbin_secure('/')) - resp_with_cert = session.get(httpbin_secure('/'), cert=(cert, key)) + + s.verify = ca_file + resp = s.get(httpbin_secure('/')) + resp_with_cert = s.get(httpbin_secure('/'), cert=(cert, key)) assert resp_with_cert.raw._pool.cert_file == cert assert resp_with_cert.raw._pool.key_file == key assert resp.raw._pool is not resp_with_cert.raw._pool @@ -2591,8 +2560,8 @@ def test_requests_are_updated_each_time(httpbin): ('all_proxy', 'https://example.com', 'socks5://proxy.com:9876'), ], ) -def test_proxy_env_vars_override_default(var, url, proxy): - session = requests.Session() +def test_proxy_env_vars_override_default(var, url, proxy, s): + prep = PreparedRequest() prep.prepare(method='GET', url=url) kwargs = {var: proxy} @@ -2663,10 +2632,9 @@ def test_prepare_requires_a_request_method(): prepped.prepare() -def test_urllib3_retries(httpbin): +def test_urllib3_retries(httpbin, s): from urllib3.util import Retry - s = requests.Session() s.mount( 'http://', HTTPAdapter(max_retries=Retry(total=2, status_forcelist=[500])), @@ -2675,8 +2643,7 @@ def test_urllib3_retries(httpbin): s.get(httpbin('status/500')) -def test_urllib3_pool_connection_closed(httpbin): - s = requests.Session() +def test_urllib3_pool_connection_closed(httpbin, s): s.mount('http://', HTTPAdapter(pool_connections=0, pool_maxsize=0)) try: s.get(httpbin('status/200')) |
