diff options
author | Hasan Ramezani <hasan.r67@gmail.com> | 2021-10-16 23:14:04 +0330 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-10-16 14:44:04 -0500 |
commit | 093ab6cc26b31fd1a36856e7130770d11f146e0e (patch) | |
tree | ae4efb7f2ed68ce7051794a9fb94bbe95d607910 | |
parent | 4bae662777e0df3ddcd53c2dc8c1b8049433723d (diff) | |
download | urllib3-093ab6cc26b31fd1a36856e7130770d11f146e0e.tar.gz |
Add type hints to test.test_response
-rw-r--r-- | noxfile.py | 1 | ||||
-rw-r--r-- | src/urllib3/exceptions.py | 2 | ||||
-rw-r--r-- | test/test_response.py | 315 |
3 files changed, 165 insertions, 153 deletions
@@ -190,6 +190,7 @@ def mypy(session: nox.Session) -> None: "test/test_poolmanager.py", "test/test_proxymanager.py", "test/test_queue_monkeypatch.py", + "test/test_response.py", "test/test_retry.py", "test/test_ssl.py", "test/test_util.py", diff --git a/src/urllib3/exceptions.py b/src/urllib3/exceptions.py index 2cb0a854..ab562906 100644 --- a/src/urllib3/exceptions.py +++ b/src/urllib3/exceptions.py @@ -309,7 +309,7 @@ class IncompleteRead(HTTPError, httplib_IncompleteRead): class InvalidChunkLength(HTTPError, httplib_IncompleteRead): """Invalid chunk length in a chunked response.""" - def __init__(self, response: "HTTPResponse", length: int) -> None: + def __init__(self, response: "HTTPResponse", length: bytes) -> None: self.partial: int = response.tell() self.expected: Optional[int] = response.length_remaining self.response = response diff --git a/test/test_response.py b/test/test_response.py index 8be12d9b..e5f92aa6 100644 --- a/test/test_response.py +++ b/test/test_response.py @@ -4,8 +4,10 @@ import socket import ssl import zlib from base64 import b64decode +from http.client import IncompleteRead as httplib_IncompleteRead from io import BufferedReader, BytesIO, TextIOWrapper from test import onlyBrotli +from typing import Any, Generator, List, Optional from unittest import mock import pytest @@ -19,9 +21,8 @@ from urllib3.exceptions import ( ProtocolError, ResponseNotChunked, SSLError, - httplib_IncompleteRead, ) -from urllib3.response import HTTPResponse, brotli +from urllib3.response import HTTPResponse, brotli # type: ignore[attr-defined] from urllib3.util.response import is_fp_closed from urllib3.util.retry import RequestHistory, Retry @@ -44,39 +45,39 @@ nP4HF2uWHA==""" @pytest.fixture -def sock(): +def sock() -> Generator[socket.socket, None, None]: s = socket.socket() yield s s.close() class TestLegacyResponse: - def test_getheaders(self): + def test_getheaders(self) -> None: headers = {"host": "example.com"} r = HTTPResponse(headers=headers) assert r.getheaders() == [("host", "example.com")] - def test_getheader(self): + def test_getheader(self) -> None: headers = {"host": "example.com"} r = HTTPResponse(headers=headers) assert r.getheader("host") == "example.com" class TestResponse: - def test_cache_content(self): - r = HTTPResponse("foo") - assert r.data == "foo" - assert r._body == "foo" + def test_cache_content(self) -> None: + r = HTTPResponse(b"foo") + assert r.data == b"foo" + assert r._body == b"foo" - def test_default(self): + def test_default(self) -> None: r = HTTPResponse() assert r.data is None - def test_none(self): - r = HTTPResponse(None) + def test_none(self) -> None: + r = HTTPResponse(None) # type: ignore[arg-type] assert r.data is None - def test_preload(self): + def test_preload(self) -> None: fp = BytesIO(b"foo") r = HTTPResponse(fp, preload_content=True) @@ -84,7 +85,7 @@ class TestResponse: assert fp.tell() == len(b"foo") assert r.data == b"foo" - def test_no_preload(self): + def test_no_preload(self) -> None: fp = BytesIO(b"foo") r = HTTPResponse(fp, preload_content=False) @@ -93,12 +94,12 @@ class TestResponse: assert r.data == b"foo" assert fp.tell() == len(b"foo") - def test_decode_bad_data(self): + def test_decode_bad_data(self) -> None: fp = BytesIO(b"\x00" * 10) with pytest.raises(DecodeError): HTTPResponse(fp, headers={"content-encoding": "deflate"}) - def test_reference_read(self): + def test_reference_read(self) -> None: fp = BytesIO(b"foo") r = HTTPResponse(fp, preload_content=False) @@ -107,7 +108,7 @@ class TestResponse: assert r.read() == b"" assert r.read() == b"" - def test_decode_deflate(self): + def test_decode_deflate(self) -> None: data = zlib.compress(b"foo") fp = BytesIO(data) @@ -115,7 +116,7 @@ class TestResponse: assert r.data == b"foo" - def test_decode_deflate_case_insensitve(self): + def test_decode_deflate_case_insensitve(self) -> None: data = zlib.compress(b"foo") fp = BytesIO(data) @@ -123,7 +124,7 @@ class TestResponse: assert r.data == b"foo" - def test_chunked_decoding_deflate(self): + def test_chunked_decoding_deflate(self) -> None: data = zlib.compress(b"foo") fp = BytesIO(data) @@ -133,15 +134,16 @@ class TestResponse: assert r.read(3) == b"" # Buffer in case we need to switch to the raw stream - assert r._decoder._data is not None + assert r._decoder is not None + assert r._decoder._data is not None # type: ignore[attr-defined] assert r.read(1) == b"f" # Now that we've decoded data, we just stream through the decoder - assert r._decoder._data is None + assert r._decoder._data is None # type: ignore[attr-defined] assert r.read(2) == b"oo" assert r.read() == b"" assert r.read() == b"" - def test_chunked_decoding_deflate2(self): + def test_chunked_decoding_deflate2(self) -> None: compress = zlib.compressobj(6, zlib.DEFLATED, -zlib.MAX_WBITS) data = compress.compress(b"foo") data += compress.flush() @@ -154,12 +156,13 @@ class TestResponse: assert r.read(1) == b"" assert r.read(1) == b"f" # Once we've decoded data, we just stream to the decoder; no buffering - assert r._decoder._data is None + assert r._decoder is not None + assert r._decoder._data is None # type: ignore[attr-defined] assert r.read(2) == b"oo" assert r.read() == b"" assert r.read() == b"" - def test_chunked_decoding_gzip(self): + def test_chunked_decoding_gzip(self) -> None: compress = zlib.compressobj(6, zlib.DEFLATED, 16 + zlib.MAX_WBITS) data = compress.compress(b"foo") data += compress.flush() @@ -175,7 +178,7 @@ class TestResponse: assert r.read() == b"" assert r.read() == b"" - def test_decode_gzip_multi_member(self): + def test_decode_gzip_multi_member(self) -> None: compress = zlib.compressobj(6, zlib.DEFLATED, 16 + zlib.MAX_WBITS) data = compress.compress(b"foo") data += compress.flush() @@ -186,12 +189,12 @@ class TestResponse: assert r.data == b"foofoofoo" - def test_decode_gzip_error(self): + def test_decode_gzip_error(self) -> None: fp = BytesIO(b"foo") with pytest.raises(DecodeError): HTTPResponse(fp, headers={"content-encoding": "gzip"}) - def test_decode_gzip_swallow_garbage(self): + def test_decode_gzip_swallow_garbage(self) -> None: # When data comes from multiple calls to read(), data after # the first zlib error (here triggered by garbage) should be # ignored. @@ -212,7 +215,7 @@ class TestResponse: assert ret == b"foofoofoo" - def test_chunked_decoding_gzip_swallow_garbage(self): + def test_chunked_decoding_gzip_swallow_garbage(self) -> None: compress = zlib.compressobj(6, zlib.DEFLATED, 16 + zlib.MAX_WBITS) data = compress.compress(b"foo") data += compress.flush() @@ -224,7 +227,7 @@ class TestResponse: assert r.data == b"foofoofoo" @onlyBrotli() - def test_decode_brotli(self): + def test_decode_brotli(self) -> None: data = brotli.compress(b"foo") fp = BytesIO(data) @@ -232,7 +235,7 @@ class TestResponse: assert r.data == b"foo" @onlyBrotli() - def test_chunked_decoding_brotli(self): + def test_chunked_decoding_brotli(self) -> None: data = brotli.compress(b"foobarbaz") fp = BytesIO(data) @@ -246,12 +249,12 @@ class TestResponse: assert ret == b"foobarbaz" @onlyBrotli() - def test_decode_brotli_error(self): + def test_decode_brotli_error(self) -> None: fp = BytesIO(b"foo") with pytest.raises(DecodeError): HTTPResponse(fp, headers={"content-encoding": "br"}) - def test_multi_decoding_deflate_deflate(self): + def test_multi_decoding_deflate_deflate(self) -> None: data = zlib.compress(zlib.compress(b"foo")) fp = BytesIO(data) @@ -259,7 +262,7 @@ class TestResponse: assert r.data == b"foo" - def test_multi_decoding_deflate_gzip(self): + def test_multi_decoding_deflate_gzip(self) -> None: compress = zlib.compressobj(6, zlib.DEFLATED, 16 + zlib.MAX_WBITS) data = compress.compress(zlib.compress(b"foo")) data += compress.flush() @@ -269,7 +272,7 @@ class TestResponse: assert r.data == b"foo" - def test_multi_decoding_gzip_gzip(self): + def test_multi_decoding_gzip_gzip(self) -> None: compress = zlib.compressobj(6, zlib.DEFLATED, 16 + zlib.MAX_WBITS) data = compress.compress(b"foo") data += compress.flush() @@ -283,12 +286,12 @@ class TestResponse: assert r.data == b"foo" - def test_body_blob(self): + def test_body_blob(self) -> None: resp = HTTPResponse(b"foo") assert resp.data == b"foo" assert resp.closed - def test_io(self, sock): + def test_io(self, sock: socket.socket) -> None: fp = BytesIO(b"foo") resp = HTTPResponse(fp, preload_content=False) @@ -317,21 +320,22 @@ class TestResponse: with pytest.raises(IOError): resp3.fileno() - resp3._fp = 2 + resp3._fp = 2 # type: ignore[assignment] # A corner case where _fp is present but doesn't have `closed`, # `isclosed`, or `fileno`. Unlikely, but possible. assert resp3.closed with pytest.raises(IOError): resp3.fileno() - def test_io_closed_consistently(self, sock): + def test_io_closed_consistently(self, sock: socket.socket) -> None: try: hlr = httplib.HTTPResponse(sock) - hlr.fp = BytesIO(b"foo") - hlr.chunked = 0 - hlr.length = 3 + hlr.fp = BytesIO(b"foo") # type: ignore[attr-defined] + hlr.chunked = 0 # type: ignore[attr-defined] + hlr.length = 3 # type: ignore[attr-defined] with HTTPResponse(hlr, preload_content=False) as resp: assert not resp.closed + assert resp._fp is not None assert not resp._fp.isclosed() assert not is_fp_closed(resp._fp) assert not resp.isclosed() @@ -343,10 +347,10 @@ class TestResponse: finally: hlr.close() - def test_io_bufferedreader(self): + def test_io_bufferedreader(self) -> None: fp = BytesIO(b"foo") resp = HTTPResponse(fp, preload_content=False) - br = BufferedReader(resp) + br = BufferedReader(resp) # type: ignore[arg-type] assert br.read() == b"foo" @@ -358,12 +362,12 @@ class TestResponse: fp = BytesIO(b"hello\nworld") resp = HTTPResponse(fp, preload_content=False) with pytest.raises(ValueError, match="readline of closed file"): - list(BufferedReader(resp)) + list(BufferedReader(resp)) # type: ignore[arg-type] b = b"fooandahalf" fp = BytesIO(b) resp = HTTPResponse(fp, preload_content=False) - br = BufferedReader(resp, 5) + br = BufferedReader(resp, 5) # type: ignore[arg-type] br.read(1) # sets up the buffer, reading 5 assert len(fp.read()) == (len(b) - 5) @@ -373,10 +377,10 @@ class TestResponse: while not br.closed: br.read(5) - def test_io_not_autoclose_bufferedreader(self): + def test_io_not_autoclose_bufferedreader(self) -> None: fp = BytesIO(b"hello\nworld") resp = HTTPResponse(fp, preload_content=False, auto_close=False) - reader = BufferedReader(resp) + reader = BufferedReader(resp) # type: ignore[arg-type] assert list(reader) == [b"hello\n", b"world"] assert not reader.closed @@ -390,10 +394,10 @@ class TestResponse: with pytest.raises(ValueError, match="readline of closed file"): next(reader) - def test_io_textiowrapper(self): + def test_io_textiowrapper(self) -> None: fp = BytesIO(b"\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f") resp = HTTPResponse(fp, preload_content=False) - br = TextIOWrapper(resp, encoding="utf8") + br = TextIOWrapper(resp, encoding="utf8") # type: ignore[arg-type] assert br.read() == "äöüß" @@ -407,14 +411,14 @@ class TestResponse: ) resp = HTTPResponse(fp, preload_content=False) with pytest.raises(ValueError, match="I/O operation on closed file.?"): - list(TextIOWrapper(resp)) + list(TextIOWrapper(resp)) # type: ignore[arg-type] - def test_io_not_autoclose_textiowrapper(self): + def test_io_not_autoclose_textiowrapper(self) -> None: fp = BytesIO( b"\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f\n\xce\xb1\xce\xb2\xce\xb3\xce\xb4" ) resp = HTTPResponse(fp, preload_content=False, auto_close=False) - reader = TextIOWrapper(resp, encoding="utf8") + reader = TextIOWrapper(resp, encoding="utf8") # type: ignore[arg-type] assert list(reader) == ["äöüß\n", "αβγδ"] assert not reader.closed @@ -428,7 +432,7 @@ class TestResponse: with pytest.raises(ValueError, match="I/O operation on closed file.?"): next(reader) - def test_streaming(self): + def test_streaming(self) -> None: fp = BytesIO(b"foo") resp = HTTPResponse(fp, preload_content=False) stream = resp.stream(2, decode_content=False) @@ -438,7 +442,7 @@ class TestResponse: with pytest.raises(StopIteration): next(stream) - def test_streaming_tell(self): + def test_streaming_tell(self) -> None: fp = BytesIO(b"foo") resp = HTTPResponse(fp, preload_content=False) stream = resp.stream(2, decode_content=False) @@ -456,7 +460,7 @@ class TestResponse: with pytest.raises(StopIteration): next(stream) - def test_gzipped_streaming(self): + def test_gzipped_streaming(self) -> None: compress = zlib.compressobj(6, zlib.DEFLATED, 16 + zlib.MAX_WBITS) data = compress.compress(b"foo") data += compress.flush() @@ -472,7 +476,7 @@ class TestResponse: with pytest.raises(StopIteration): next(stream) - def test_gzipped_streaming_tell(self): + def test_gzipped_streaming_tell(self) -> None: compress = zlib.compressobj(6, zlib.DEFLATED, 16 + zlib.MAX_WBITS) uncompressed_data = b"foo" data = compress.compress(uncompressed_data) @@ -493,7 +497,7 @@ class TestResponse: with pytest.raises(StopIteration): next(stream) - def test_deflate_streaming_tell_intermediate_point(self): + def test_deflate_streaming_tell_intermediate_point(self) -> None: # Ensure that ``tell()`` returns the correct number of bytes when # part-way through streaming compressed content. NUMBER_OF_READS = 10 @@ -504,7 +508,7 @@ class TestResponse: calls to ``read``. """ - def __init__(self, payload, payload_part_size): + def __init__(self, payload: bytes, payload_part_size: int) -> None: self.payloads = [ payload[i * payload_part_size : (i + 1) * payload_part_size] for i in range(NUMBER_OF_READS + 1) @@ -512,7 +516,7 @@ class TestResponse: assert b"".join(self.payloads) == payload - def read(self, _): + def read(self, _: int) -> bytes: # type: ignore[override] # Amount is unused. if len(self.payloads) > 0: return self.payloads.pop(0) @@ -546,7 +550,7 @@ class TestResponse: # Check that the end of the stream is in the correct place assert len(ZLIB_PAYLOAD) == end_of_stream - def test_deflate_streaming(self): + def test_deflate_streaming(self) -> None: data = zlib.compress(b"foo") fp = BytesIO(data) @@ -560,7 +564,7 @@ class TestResponse: with pytest.raises(StopIteration): next(stream) - def test_deflate2_streaming(self): + def test_deflate2_streaming(self) -> None: compress = zlib.compressobj(6, zlib.DEFLATED, -zlib.MAX_WBITS) data = compress.compress(b"foo") data += compress.flush() @@ -576,7 +580,7 @@ class TestResponse: with pytest.raises(StopIteration): next(stream) - def test_empty_stream(self): + def test_empty_stream(self) -> None: fp = BytesIO(b"") resp = HTTPResponse(fp, preload_content=False) stream = resp.stream(2, decode_content=False) @@ -584,19 +588,19 @@ class TestResponse: with pytest.raises(StopIteration): next(stream) - def test_length_no_header(self): + def test_length_no_header(self) -> None: fp = BytesIO(b"12345") resp = HTTPResponse(fp, preload_content=False) assert resp.length_remaining is None - def test_length_w_valid_header(self): + def test_length_w_valid_header(self) -> None: headers = {"content-length": "5"} fp = BytesIO(b"12345") resp = HTTPResponse(fp, headers=headers, preload_content=False) assert resp.length_remaining == 5 - def test_length_w_bad_header(self): + def test_length_w_bad_header(self) -> None: garbage = {"content-length": "foo"} fp = BytesIO(b"12345") @@ -607,7 +611,7 @@ class TestResponse: resp = HTTPResponse(fp, headers=garbage, preload_content=False) assert resp.length_remaining is None - def test_length_when_chunked(self): + def test_length_when_chunked(self) -> None: # This is expressly forbidden in RFC 7230 sec 3.3.2 # We fall back to chunked in this case and try to # handle response ignoring content length. @@ -617,7 +621,7 @@ class TestResponse: resp = HTTPResponse(fp, headers=headers, preload_content=False) assert resp.length_remaining is None - def test_length_with_multiple_content_lengths(self): + def test_length_with_multiple_content_lengths(self) -> None: headers = {"content-length": "5, 5, 5"} garbage = {"content-length": "5, 42"} fp = BytesIO(b"abcde") @@ -628,7 +632,7 @@ class TestResponse: with pytest.raises(InvalidHeader): HTTPResponse(fp, headers=garbage, preload_content=False) - def test_length_after_read(self): + def test_length_after_read(self) -> None: headers = {"content-length": "5"} # Test no defined length @@ -650,27 +654,29 @@ class TestResponse: next(data) assert resp.length_remaining == 3 - def test_mock_httpresponse_stream(self): + def test_mock_httpresponse_stream(self) -> None: # Mock out a HTTP Request that does enough to make it through urllib3's # read() and close() calls, and also exhausts and underlying file # object. class MockHTTPRequest: - self.fp = None + def __init__(self) -> None: + self.fp: Optional[BytesIO] = None - def read(self, amt): + def read(self, amt: int) -> bytes: + assert self.fp is not None data = self.fp.read(amt) if not data: self.fp = None return data - def close(self): + def close(self) -> None: self.fp = None bio = BytesIO(b"foo") fp = MockHTTPRequest() fp.fp = bio - resp = HTTPResponse(fp, preload_content=False) + resp = HTTPResponse(fp, preload_content=False) # type: ignore[arg-type] stream = resp.stream(2) assert next(stream) == b"fo" @@ -678,11 +684,11 @@ class TestResponse: with pytest.raises(StopIteration): next(stream) - def test_mock_transfer_encoding_chunked(self): + def test_mock_transfer_encoding_chunked(self) -> None: stream = [b"fo", b"o", b"bar"] fp = MockChunkedEncodingResponse(stream) - r = httplib.HTTPResponse(MockSock) - r.fp = fp + r = httplib.HTTPResponse(MockSock) # type: ignore[arg-type] + r.fp = fp # type: ignore[attr-defined] resp = HTTPResponse( r, preload_content=False, headers={"transfer-encoding": "chunked"} ) @@ -690,10 +696,10 @@ class TestResponse: for i, c in enumerate(resp.stream()): assert c == stream[i] - def test_mock_gzipped_transfer_encoding_chunked_decoded(self): + def test_mock_gzipped_transfer_encoding_chunked_decoded(self) -> None: """Show that we can decode the gzipped and chunked body.""" - def stream(): + def stream() -> Generator[bytes, None, None]: # Set up a generator to chunk the gzipped body compress = zlib.compressobj(6, zlib.DEFLATED, 16 + zlib.MAX_WBITS) data = compress.compress(b"foobar") @@ -702,8 +708,8 @@ class TestResponse: yield data[i : i + 2] fp = MockChunkedEncodingResponse(list(stream())) - r = httplib.HTTPResponse(MockSock) - r.fp = fp + r = httplib.HTTPResponse(MockSock) # type: ignore[arg-type] + r.fp = fp # type: ignore[attr-defined] headers = {"transfer-encoding": "chunked", "content-encoding": "gzip"} resp = HTTPResponse(r, preload_content=False, headers=headers) @@ -713,13 +719,13 @@ class TestResponse: assert b"foobar" == data - def test_mock_transfer_encoding_chunked_custom_read(self): + def test_mock_transfer_encoding_chunked_custom_read(self) -> None: stream = [b"foooo", b"bbbbaaaaar"] fp = MockChunkedEncodingResponse(stream) - r = httplib.HTTPResponse(MockSock) - r.fp = fp - r.chunked = True - r.chunk_left = None + r = httplib.HTTPResponse(MockSock) # type: ignore[arg-type] + r.fp = fp # type: ignore[attr-defined] + r.chunked = True # type: ignore[attr-defined] + r.chunk_left = None # type: ignore[attr-defined] resp = HTTPResponse( r, preload_content=False, headers={"transfer-encoding": "chunked"} ) @@ -727,26 +733,26 @@ class TestResponse: response = list(resp.read_chunked(2)) assert expected_response == response - def test_mock_transfer_encoding_chunked_unlmtd_read(self): + def test_mock_transfer_encoding_chunked_unlmtd_read(self) -> None: stream = [b"foooo", b"bbbbaaaaar"] fp = MockChunkedEncodingResponse(stream) - r = httplib.HTTPResponse(MockSock) - r.fp = fp - r.chunked = True - r.chunk_left = None + r = httplib.HTTPResponse(MockSock) # type: ignore[arg-type] + r.fp = fp # type: ignore[attr-defined] + r.chunked = True # type: ignore[attr-defined] + r.chunk_left = None # type: ignore[attr-defined] resp = HTTPResponse( r, preload_content=False, headers={"transfer-encoding": "chunked"} ) assert stream == list(resp.read_chunked()) - def test_read_not_chunked_response_as_chunks(self): + def test_read_not_chunked_response_as_chunks(self) -> None: fp = BytesIO(b"foo") resp = HTTPResponse(fp, preload_content=False) r = resp.read_chunked() with pytest.raises(ResponseNotChunked): next(r) - def test_read_chunked_not_supported(self): + def test_read_chunked_not_supported(self) -> None: fp = BytesIO(b"foo") resp = HTTPResponse( fp, preload_content=False, headers={"transfer-encoding": "chunked"} @@ -755,7 +761,7 @@ class TestResponse: with pytest.raises(BodyNotHttplibCompatible): next(r) - def test_buggy_incomplete_read(self): + def test_buggy_incomplete_read(self) -> None: # Simulate buggy versions of Python (<2.7.4) # See http://bugs.python.org/issue16298 content_length = 1337 @@ -774,13 +780,13 @@ class TestResponse: assert orig_ex.partial == 0 assert orig_ex.expected == content_length - def test_incomplete_chunk(self): + def test_incomplete_chunk(self) -> None: stream = [b"foooo", b"bbbbaaaaar"] fp = MockChunkedIncompleteRead(stream) - r = httplib.HTTPResponse(MockSock) - r.fp = fp - r.chunked = True - r.chunk_left = None + r = httplib.HTTPResponse(MockSock) # type: ignore[arg-type] + r.fp = fp # type: ignore[attr-defined] + r.chunked = True # type: ignore[attr-defined] + r.chunk_left = None # type: ignore[attr-defined] resp = HTTPResponse( r, preload_content=False, headers={"transfer-encoding": "chunked"} ) @@ -790,13 +796,13 @@ class TestResponse: orig_ex = ctx.value.args[1] assert isinstance(orig_ex, httplib_IncompleteRead) - def test_invalid_chunk_length(self): + def test_invalid_chunk_length(self) -> None: stream = [b"foooo", b"bbbbaaaaar"] fp = MockChunkedInvalidChunkLength(stream) - r = httplib.HTTPResponse(MockSock) - r.fp = fp - r.chunked = True - r.chunk_left = None + r = httplib.HTTPResponse(MockSock) # type: ignore[arg-type] + r.fp = fp # type: ignore[attr-defined] + r.chunked = True # type: ignore[attr-defined] + r.chunk_left = None # type: ignore[attr-defined] resp = HTTPResponse( r, preload_content=False, headers={"transfer-encoding": "chunked"} ) @@ -804,37 +810,42 @@ class TestResponse: next(resp.read_chunked()) orig_ex = ctx.value.args[1] + msg = ( + "(\"Connection broken: InvalidChunkLength(got length b'ZZZ\\\\r\\\\n', 0 bytes read)\", " + "InvalidChunkLength(got length b'ZZZ\\r\\n', 0 bytes read))" + ) + assert str(ctx.value) == msg assert isinstance(orig_ex, InvalidChunkLength) assert orig_ex.length == fp.BAD_LENGTH_LINE.encode() - def test_chunked_response_without_crlf_on_end(self): + def test_chunked_response_without_crlf_on_end(self) -> None: stream = [b"foo", b"bar", b"baz"] fp = MockChunkedEncodingWithoutCRLFOnEnd(stream) - r = httplib.HTTPResponse(MockSock) - r.fp = fp - r.chunked = True - r.chunk_left = None + r = httplib.HTTPResponse(MockSock) # type: ignore[arg-type] + r.fp = fp # type: ignore[attr-defined] + r.chunked = True # type: ignore[attr-defined] + r.chunk_left = None # type: ignore[attr-defined] resp = HTTPResponse( r, preload_content=False, headers={"transfer-encoding": "chunked"} ) assert stream == list(resp.stream()) - def test_chunked_response_with_extensions(self): + def test_chunked_response_with_extensions(self) -> None: stream = [b"foo", b"bar"] fp = MockChunkedEncodingWithExtensions(stream) - r = httplib.HTTPResponse(MockSock) - r.fp = fp - r.chunked = True - r.chunk_left = None + r = httplib.HTTPResponse(MockSock) # type: ignore[arg-type] + r.fp = fp # type: ignore[attr-defined] + r.chunked = True # type: ignore[attr-defined] + r.chunk_left = None # type: ignore[attr-defined] resp = HTTPResponse( r, preload_content=False, headers={"transfer-encoding": "chunked"} ) assert stream == list(resp.stream()) - def test_chunked_head_response(self): - r = httplib.HTTPResponse(MockSock, method="HEAD") - r.chunked = True - r.chunk_left = None + def test_chunked_head_response(self) -> None: + r = httplib.HTTPResponse(MockSock, method="HEAD") # type: ignore[arg-type] + r.chunked = True # type: ignore[attr-defined] + r.chunk_left = None # type: ignore[attr-defined] resp = HTTPResponse( "", preload_content=False, @@ -843,19 +854,19 @@ class TestResponse: ) assert resp.chunked is True - resp.supports_chunked_reads = lambda: True - resp.release_conn = mock.Mock() + setattr(resp, "supports_chunked_reads", lambda: True) + setattr(resp, "release_conn", mock.Mock()) for _ in resp.stream(): continue - resp.release_conn.assert_called_once_with() + resp.release_conn.assert_called_once_with() # type: ignore[attr-defined] - def test_get_case_insensitive_headers(self): + def test_get_case_insensitive_headers(self) -> None: headers = {"host": "example.com"} r = HTTPResponse(headers=headers) assert r.headers.get("host") == "example.com" assert r.headers.get("Host") == "example.com" - def test_retries(self): + def test_retries(self) -> None: fp = BytesIO(b"") resp = HTTPResponse(fp) assert resp.retries is None @@ -863,13 +874,13 @@ class TestResponse: resp = HTTPResponse(fp, retries=retry) assert resp.retries == retry - def test_geturl(self): + def test_geturl(self) -> None: fp = BytesIO(b"") request_url = "https://example.com" resp = HTTPResponse(fp, request_url=request_url) assert resp.geturl() == request_url - def test_url(self): + def test_url(self) -> None: fp = BytesIO(b"") request_url = "https://example.com" resp = HTTPResponse(fp, request_url=request_url) @@ -877,10 +888,10 @@ class TestResponse: resp.url = "https://anotherurl.com" assert resp.url == "https://anotherurl.com" - def test_geturl_retries(self): + def test_geturl_retries(self) -> None: fp = BytesIO(b"") resp = HTTPResponse(fp, request_url="http://example.com") - request_histories = [ + request_histories = ( RequestHistory( method="GET", url="http://example.com", @@ -895,7 +906,7 @@ class TestResponse: status=301, redirect_location="https://www.example.com", ), - ] + ) retry = Retry(history=request_histories) resp = HTTPResponse(fp, retries=retry) assert resp.geturl() == "https://www.example.com" @@ -910,15 +921,15 @@ class TestResponse: (b"Hello\nworld\n\n\n!", [b"Hello\n", b"world\n", b"\n", b"\n", b"!"]), ], ) - def test__iter__(self, payload, expected_stream): + def test__iter__(self, payload: bytes, expected_stream: List[bytes]) -> None: actual_stream = [] for chunk in HTTPResponse(BytesIO(payload), preload_content=False): actual_stream.append(chunk) assert actual_stream == expected_stream - def test__iter__decode_content(self): - def stream(): + def test__iter__decode_content(self) -> None: + def stream() -> Generator[bytes, None, None]: # Set up a generator to chunk the gzipped body compress = zlib.compressobj(6, zlib.DEFLATED, 16 + zlib.MAX_WBITS) data = compress.compress(b"foo\nbar") @@ -927,8 +938,8 @@ class TestResponse: yield data[i : i + 2] fp = MockChunkedEncodingResponse(list(stream())) - r = httplib.HTTPResponse(MockSock) - r.fp = fp + r = httplib.HTTPResponse(MockSock) # type: ignore[arg-type] + r.fp = fp # type: ignore[attr-defined] headers = {"transfer-encoding": "chunked", "content-encoding": "gzip"} resp = HTTPResponse(r, preload_content=False, headers=headers) @@ -938,13 +949,13 @@ class TestResponse: assert b"foo\nbar" == data - def test_non_timeout_ssl_error_on_read(self): + def test_non_timeout_ssl_error_on_read(self) -> None: mac_error = ssl.SSLError( "SSL routines", "ssl3_get_record", "decryption failed or bad record mac" ) @contextlib.contextmanager - def make_bad_mac_fp(): + def make_bad_mac_fp() -> Generator[BytesIO, None, None]: fp = BytesIO(b"") with mock.patch.object(fp, "read") as fp_read: # mac/decryption error @@ -964,7 +975,7 @@ class TestResponse: class MockChunkedEncodingResponse: - def __init__(self, content): + def __init__(self, content: List[bytes]) -> None: """ content: collection of str, each str is a chunk in response """ @@ -974,13 +985,12 @@ class MockChunkedEncodingResponse: self.cur_chunk = b"" self.chunks_exhausted = False - @staticmethod - def _encode_chunk(chunk): + def _encode_chunk(self, chunk: bytes) -> bytes: # In the general case, we can't decode the chunk to unicode length = f"{len(chunk):X}\r\n" return length.encode() + chunk + b"\r\n" - def _pop_new_chunk(self): + def _pop_new_chunk(self) -> bytes: if self.chunks_exhausted: return b"" try: @@ -993,9 +1003,10 @@ class MockChunkedEncodingResponse: chunk = self._encode_chunk(chunk) if not isinstance(chunk, bytes): chunk = chunk.encode() + assert isinstance(chunk, bytes) return chunk - def pop_current_chunk(self, amt=-1, till_crlf=False): + def pop_current_chunk(self, amt: int = -1, till_crlf: bool = False) -> bytes: if amt > 0 and till_crlf: raise ValueError("Can't specify amt and till_crlf.") if len(self.cur_chunk) <= 0: @@ -1025,47 +1036,47 @@ class MockChunkedEncodingResponse: self.cur_chunk = self.cur_chunk[amt:] return chunk_part - def readline(self): + def readline(self) -> bytes: return self.pop_current_chunk(till_crlf=True) - def read(self, amt=-1): + def read(self, amt: int = -1) -> bytes: return self.pop_current_chunk(amt) - def flush(self): + def flush(self) -> None: # Python 3 wants this method. pass - def close(self): + def close(self) -> None: self.closed = True class MockChunkedIncompleteRead(MockChunkedEncodingResponse): - def _encode_chunk(self, chunk): - return f"9999\r\n{chunk.decode()}\r\n" + def _encode_chunk(self, chunk: bytes) -> bytes: + return f"9999\r\n{chunk.decode()}\r\n".encode() class MockChunkedInvalidChunkLength(MockChunkedEncodingResponse): BAD_LENGTH_LINE = "ZZZ\r\n" - def _encode_chunk(self, chunk): - return f"{self.BAD_LENGTH_LINE}{chunk.decode()}\r\n" + def _encode_chunk(self, chunk: bytes) -> bytes: + return f"{self.BAD_LENGTH_LINE}{chunk.decode()}\r\n".encode() class MockChunkedEncodingWithoutCRLFOnEnd(MockChunkedEncodingResponse): - def _encode_chunk(self, chunk): + def _encode_chunk(self, chunk: bytes) -> bytes: return "{:X}\r\n{}{}".format( len(chunk), chunk.decode(), "\r\n" if len(chunk) > 0 else "", - ) + ).encode() class MockChunkedEncodingWithExtensions(MockChunkedEncodingResponse): - def _encode_chunk(self, chunk): - return f"{len(chunk):X};asd=qwe\r\n{chunk.decode()}\r\n" + def _encode_chunk(self, chunk: bytes) -> bytes: + return f"{len(chunk):X};asd=qwe\r\n{chunk.decode()}\r\n".encode() class MockSock: @classmethod - def makefile(cls, *args, **kwargs): + def makefile(cls, *args: Any, **kwargs: Any) -> None: return |