summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHasan Ramezani <hasan.r67@gmail.com>2021-10-16 23:14:04 +0330
committerGitHub <noreply@github.com>2021-10-16 14:44:04 -0500
commit093ab6cc26b31fd1a36856e7130770d11f146e0e (patch)
treeae4efb7f2ed68ce7051794a9fb94bbe95d607910
parent4bae662777e0df3ddcd53c2dc8c1b8049433723d (diff)
downloadurllib3-093ab6cc26b31fd1a36856e7130770d11f146e0e.tar.gz
Add type hints to test.test_response
-rw-r--r--noxfile.py1
-rw-r--r--src/urllib3/exceptions.py2
-rw-r--r--test/test_response.py315
3 files changed, 165 insertions, 153 deletions
diff --git a/noxfile.py b/noxfile.py
index dc202553..3cb4eb5b 100644
--- a/noxfile.py
+++ b/noxfile.py
@@ -190,6 +190,7 @@ def mypy(session: nox.Session) -> None:
"test/test_poolmanager.py",
"test/test_proxymanager.py",
"test/test_queue_monkeypatch.py",
+ "test/test_response.py",
"test/test_retry.py",
"test/test_ssl.py",
"test/test_util.py",
diff --git a/src/urllib3/exceptions.py b/src/urllib3/exceptions.py
index 2cb0a854..ab562906 100644
--- a/src/urllib3/exceptions.py
+++ b/src/urllib3/exceptions.py
@@ -309,7 +309,7 @@ class IncompleteRead(HTTPError, httplib_IncompleteRead):
class InvalidChunkLength(HTTPError, httplib_IncompleteRead):
"""Invalid chunk length in a chunked response."""
- def __init__(self, response: "HTTPResponse", length: int) -> None:
+ def __init__(self, response: "HTTPResponse", length: bytes) -> None:
self.partial: int = response.tell()
self.expected: Optional[int] = response.length_remaining
self.response = response
diff --git a/test/test_response.py b/test/test_response.py
index 8be12d9b..e5f92aa6 100644
--- a/test/test_response.py
+++ b/test/test_response.py
@@ -4,8 +4,10 @@ import socket
import ssl
import zlib
from base64 import b64decode
+from http.client import IncompleteRead as httplib_IncompleteRead
from io import BufferedReader, BytesIO, TextIOWrapper
from test import onlyBrotli
+from typing import Any, Generator, List, Optional
from unittest import mock
import pytest
@@ -19,9 +21,8 @@ from urllib3.exceptions import (
ProtocolError,
ResponseNotChunked,
SSLError,
- httplib_IncompleteRead,
)
-from urllib3.response import HTTPResponse, brotli
+from urllib3.response import HTTPResponse, brotli # type: ignore[attr-defined]
from urllib3.util.response import is_fp_closed
from urllib3.util.retry import RequestHistory, Retry
@@ -44,39 +45,39 @@ nP4HF2uWHA=="""
@pytest.fixture
-def sock():
+def sock() -> Generator[socket.socket, None, None]:
s = socket.socket()
yield s
s.close()
class TestLegacyResponse:
- def test_getheaders(self):
+ def test_getheaders(self) -> None:
headers = {"host": "example.com"}
r = HTTPResponse(headers=headers)
assert r.getheaders() == [("host", "example.com")]
- def test_getheader(self):
+ def test_getheader(self) -> None:
headers = {"host": "example.com"}
r = HTTPResponse(headers=headers)
assert r.getheader("host") == "example.com"
class TestResponse:
- def test_cache_content(self):
- r = HTTPResponse("foo")
- assert r.data == "foo"
- assert r._body == "foo"
+ def test_cache_content(self) -> None:
+ r = HTTPResponse(b"foo")
+ assert r.data == b"foo"
+ assert r._body == b"foo"
- def test_default(self):
+ def test_default(self) -> None:
r = HTTPResponse()
assert r.data is None
- def test_none(self):
- r = HTTPResponse(None)
+ def test_none(self) -> None:
+ r = HTTPResponse(None) # type: ignore[arg-type]
assert r.data is None
- def test_preload(self):
+ def test_preload(self) -> None:
fp = BytesIO(b"foo")
r = HTTPResponse(fp, preload_content=True)
@@ -84,7 +85,7 @@ class TestResponse:
assert fp.tell() == len(b"foo")
assert r.data == b"foo"
- def test_no_preload(self):
+ def test_no_preload(self) -> None:
fp = BytesIO(b"foo")
r = HTTPResponse(fp, preload_content=False)
@@ -93,12 +94,12 @@ class TestResponse:
assert r.data == b"foo"
assert fp.tell() == len(b"foo")
- def test_decode_bad_data(self):
+ def test_decode_bad_data(self) -> None:
fp = BytesIO(b"\x00" * 10)
with pytest.raises(DecodeError):
HTTPResponse(fp, headers={"content-encoding": "deflate"})
- def test_reference_read(self):
+ def test_reference_read(self) -> None:
fp = BytesIO(b"foo")
r = HTTPResponse(fp, preload_content=False)
@@ -107,7 +108,7 @@ class TestResponse:
assert r.read() == b""
assert r.read() == b""
- def test_decode_deflate(self):
+ def test_decode_deflate(self) -> None:
data = zlib.compress(b"foo")
fp = BytesIO(data)
@@ -115,7 +116,7 @@ class TestResponse:
assert r.data == b"foo"
- def test_decode_deflate_case_insensitve(self):
+ def test_decode_deflate_case_insensitve(self) -> None:
data = zlib.compress(b"foo")
fp = BytesIO(data)
@@ -123,7 +124,7 @@ class TestResponse:
assert r.data == b"foo"
- def test_chunked_decoding_deflate(self):
+ def test_chunked_decoding_deflate(self) -> None:
data = zlib.compress(b"foo")
fp = BytesIO(data)
@@ -133,15 +134,16 @@ class TestResponse:
assert r.read(3) == b""
# Buffer in case we need to switch to the raw stream
- assert r._decoder._data is not None
+ assert r._decoder is not None
+ assert r._decoder._data is not None # type: ignore[attr-defined]
assert r.read(1) == b"f"
# Now that we've decoded data, we just stream through the decoder
- assert r._decoder._data is None
+ assert r._decoder._data is None # type: ignore[attr-defined]
assert r.read(2) == b"oo"
assert r.read() == b""
assert r.read() == b""
- def test_chunked_decoding_deflate2(self):
+ def test_chunked_decoding_deflate2(self) -> None:
compress = zlib.compressobj(6, zlib.DEFLATED, -zlib.MAX_WBITS)
data = compress.compress(b"foo")
data += compress.flush()
@@ -154,12 +156,13 @@ class TestResponse:
assert r.read(1) == b""
assert r.read(1) == b"f"
# Once we've decoded data, we just stream to the decoder; no buffering
- assert r._decoder._data is None
+ assert r._decoder is not None
+ assert r._decoder._data is None # type: ignore[attr-defined]
assert r.read(2) == b"oo"
assert r.read() == b""
assert r.read() == b""
- def test_chunked_decoding_gzip(self):
+ def test_chunked_decoding_gzip(self) -> None:
compress = zlib.compressobj(6, zlib.DEFLATED, 16 + zlib.MAX_WBITS)
data = compress.compress(b"foo")
data += compress.flush()
@@ -175,7 +178,7 @@ class TestResponse:
assert r.read() == b""
assert r.read() == b""
- def test_decode_gzip_multi_member(self):
+ def test_decode_gzip_multi_member(self) -> None:
compress = zlib.compressobj(6, zlib.DEFLATED, 16 + zlib.MAX_WBITS)
data = compress.compress(b"foo")
data += compress.flush()
@@ -186,12 +189,12 @@ class TestResponse:
assert r.data == b"foofoofoo"
- def test_decode_gzip_error(self):
+ def test_decode_gzip_error(self) -> None:
fp = BytesIO(b"foo")
with pytest.raises(DecodeError):
HTTPResponse(fp, headers={"content-encoding": "gzip"})
- def test_decode_gzip_swallow_garbage(self):
+ def test_decode_gzip_swallow_garbage(self) -> None:
# When data comes from multiple calls to read(), data after
# the first zlib error (here triggered by garbage) should be
# ignored.
@@ -212,7 +215,7 @@ class TestResponse:
assert ret == b"foofoofoo"
- def test_chunked_decoding_gzip_swallow_garbage(self):
+ def test_chunked_decoding_gzip_swallow_garbage(self) -> None:
compress = zlib.compressobj(6, zlib.DEFLATED, 16 + zlib.MAX_WBITS)
data = compress.compress(b"foo")
data += compress.flush()
@@ -224,7 +227,7 @@ class TestResponse:
assert r.data == b"foofoofoo"
@onlyBrotli()
- def test_decode_brotli(self):
+ def test_decode_brotli(self) -> None:
data = brotli.compress(b"foo")
fp = BytesIO(data)
@@ -232,7 +235,7 @@ class TestResponse:
assert r.data == b"foo"
@onlyBrotli()
- def test_chunked_decoding_brotli(self):
+ def test_chunked_decoding_brotli(self) -> None:
data = brotli.compress(b"foobarbaz")
fp = BytesIO(data)
@@ -246,12 +249,12 @@ class TestResponse:
assert ret == b"foobarbaz"
@onlyBrotli()
- def test_decode_brotli_error(self):
+ def test_decode_brotli_error(self) -> None:
fp = BytesIO(b"foo")
with pytest.raises(DecodeError):
HTTPResponse(fp, headers={"content-encoding": "br"})
- def test_multi_decoding_deflate_deflate(self):
+ def test_multi_decoding_deflate_deflate(self) -> None:
data = zlib.compress(zlib.compress(b"foo"))
fp = BytesIO(data)
@@ -259,7 +262,7 @@ class TestResponse:
assert r.data == b"foo"
- def test_multi_decoding_deflate_gzip(self):
+ def test_multi_decoding_deflate_gzip(self) -> None:
compress = zlib.compressobj(6, zlib.DEFLATED, 16 + zlib.MAX_WBITS)
data = compress.compress(zlib.compress(b"foo"))
data += compress.flush()
@@ -269,7 +272,7 @@ class TestResponse:
assert r.data == b"foo"
- def test_multi_decoding_gzip_gzip(self):
+ def test_multi_decoding_gzip_gzip(self) -> None:
compress = zlib.compressobj(6, zlib.DEFLATED, 16 + zlib.MAX_WBITS)
data = compress.compress(b"foo")
data += compress.flush()
@@ -283,12 +286,12 @@ class TestResponse:
assert r.data == b"foo"
- def test_body_blob(self):
+ def test_body_blob(self) -> None:
resp = HTTPResponse(b"foo")
assert resp.data == b"foo"
assert resp.closed
- def test_io(self, sock):
+ def test_io(self, sock: socket.socket) -> None:
fp = BytesIO(b"foo")
resp = HTTPResponse(fp, preload_content=False)
@@ -317,21 +320,22 @@ class TestResponse:
with pytest.raises(IOError):
resp3.fileno()
- resp3._fp = 2
+ resp3._fp = 2 # type: ignore[assignment]
# A corner case where _fp is present but doesn't have `closed`,
# `isclosed`, or `fileno`. Unlikely, but possible.
assert resp3.closed
with pytest.raises(IOError):
resp3.fileno()
- def test_io_closed_consistently(self, sock):
+ def test_io_closed_consistently(self, sock: socket.socket) -> None:
try:
hlr = httplib.HTTPResponse(sock)
- hlr.fp = BytesIO(b"foo")
- hlr.chunked = 0
- hlr.length = 3
+ hlr.fp = BytesIO(b"foo") # type: ignore[attr-defined]
+ hlr.chunked = 0 # type: ignore[attr-defined]
+ hlr.length = 3 # type: ignore[attr-defined]
with HTTPResponse(hlr, preload_content=False) as resp:
assert not resp.closed
+ assert resp._fp is not None
assert not resp._fp.isclosed()
assert not is_fp_closed(resp._fp)
assert not resp.isclosed()
@@ -343,10 +347,10 @@ class TestResponse:
finally:
hlr.close()
- def test_io_bufferedreader(self):
+ def test_io_bufferedreader(self) -> None:
fp = BytesIO(b"foo")
resp = HTTPResponse(fp, preload_content=False)
- br = BufferedReader(resp)
+ br = BufferedReader(resp) # type: ignore[arg-type]
assert br.read() == b"foo"
@@ -358,12 +362,12 @@ class TestResponse:
fp = BytesIO(b"hello\nworld")
resp = HTTPResponse(fp, preload_content=False)
with pytest.raises(ValueError, match="readline of closed file"):
- list(BufferedReader(resp))
+ list(BufferedReader(resp)) # type: ignore[arg-type]
b = b"fooandahalf"
fp = BytesIO(b)
resp = HTTPResponse(fp, preload_content=False)
- br = BufferedReader(resp, 5)
+ br = BufferedReader(resp, 5) # type: ignore[arg-type]
br.read(1) # sets up the buffer, reading 5
assert len(fp.read()) == (len(b) - 5)
@@ -373,10 +377,10 @@ class TestResponse:
while not br.closed:
br.read(5)
- def test_io_not_autoclose_bufferedreader(self):
+ def test_io_not_autoclose_bufferedreader(self) -> None:
fp = BytesIO(b"hello\nworld")
resp = HTTPResponse(fp, preload_content=False, auto_close=False)
- reader = BufferedReader(resp)
+ reader = BufferedReader(resp) # type: ignore[arg-type]
assert list(reader) == [b"hello\n", b"world"]
assert not reader.closed
@@ -390,10 +394,10 @@ class TestResponse:
with pytest.raises(ValueError, match="readline of closed file"):
next(reader)
- def test_io_textiowrapper(self):
+ def test_io_textiowrapper(self) -> None:
fp = BytesIO(b"\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f")
resp = HTTPResponse(fp, preload_content=False)
- br = TextIOWrapper(resp, encoding="utf8")
+ br = TextIOWrapper(resp, encoding="utf8") # type: ignore[arg-type]
assert br.read() == "äöüß"
@@ -407,14 +411,14 @@ class TestResponse:
)
resp = HTTPResponse(fp, preload_content=False)
with pytest.raises(ValueError, match="I/O operation on closed file.?"):
- list(TextIOWrapper(resp))
+ list(TextIOWrapper(resp)) # type: ignore[arg-type]
- def test_io_not_autoclose_textiowrapper(self):
+ def test_io_not_autoclose_textiowrapper(self) -> None:
fp = BytesIO(
b"\xc3\xa4\xc3\xb6\xc3\xbc\xc3\x9f\n\xce\xb1\xce\xb2\xce\xb3\xce\xb4"
)
resp = HTTPResponse(fp, preload_content=False, auto_close=False)
- reader = TextIOWrapper(resp, encoding="utf8")
+ reader = TextIOWrapper(resp, encoding="utf8") # type: ignore[arg-type]
assert list(reader) == ["äöüß\n", "αβγδ"]
assert not reader.closed
@@ -428,7 +432,7 @@ class TestResponse:
with pytest.raises(ValueError, match="I/O operation on closed file.?"):
next(reader)
- def test_streaming(self):
+ def test_streaming(self) -> None:
fp = BytesIO(b"foo")
resp = HTTPResponse(fp, preload_content=False)
stream = resp.stream(2, decode_content=False)
@@ -438,7 +442,7 @@ class TestResponse:
with pytest.raises(StopIteration):
next(stream)
- def test_streaming_tell(self):
+ def test_streaming_tell(self) -> None:
fp = BytesIO(b"foo")
resp = HTTPResponse(fp, preload_content=False)
stream = resp.stream(2, decode_content=False)
@@ -456,7 +460,7 @@ class TestResponse:
with pytest.raises(StopIteration):
next(stream)
- def test_gzipped_streaming(self):
+ def test_gzipped_streaming(self) -> None:
compress = zlib.compressobj(6, zlib.DEFLATED, 16 + zlib.MAX_WBITS)
data = compress.compress(b"foo")
data += compress.flush()
@@ -472,7 +476,7 @@ class TestResponse:
with pytest.raises(StopIteration):
next(stream)
- def test_gzipped_streaming_tell(self):
+ def test_gzipped_streaming_tell(self) -> None:
compress = zlib.compressobj(6, zlib.DEFLATED, 16 + zlib.MAX_WBITS)
uncompressed_data = b"foo"
data = compress.compress(uncompressed_data)
@@ -493,7 +497,7 @@ class TestResponse:
with pytest.raises(StopIteration):
next(stream)
- def test_deflate_streaming_tell_intermediate_point(self):
+ def test_deflate_streaming_tell_intermediate_point(self) -> None:
# Ensure that ``tell()`` returns the correct number of bytes when
# part-way through streaming compressed content.
NUMBER_OF_READS = 10
@@ -504,7 +508,7 @@ class TestResponse:
calls to ``read``.
"""
- def __init__(self, payload, payload_part_size):
+ def __init__(self, payload: bytes, payload_part_size: int) -> None:
self.payloads = [
payload[i * payload_part_size : (i + 1) * payload_part_size]
for i in range(NUMBER_OF_READS + 1)
@@ -512,7 +516,7 @@ class TestResponse:
assert b"".join(self.payloads) == payload
- def read(self, _):
+ def read(self, _: int) -> bytes: # type: ignore[override]
# Amount is unused.
if len(self.payloads) > 0:
return self.payloads.pop(0)
@@ -546,7 +550,7 @@ class TestResponse:
# Check that the end of the stream is in the correct place
assert len(ZLIB_PAYLOAD) == end_of_stream
- def test_deflate_streaming(self):
+ def test_deflate_streaming(self) -> None:
data = zlib.compress(b"foo")
fp = BytesIO(data)
@@ -560,7 +564,7 @@ class TestResponse:
with pytest.raises(StopIteration):
next(stream)
- def test_deflate2_streaming(self):
+ def test_deflate2_streaming(self) -> None:
compress = zlib.compressobj(6, zlib.DEFLATED, -zlib.MAX_WBITS)
data = compress.compress(b"foo")
data += compress.flush()
@@ -576,7 +580,7 @@ class TestResponse:
with pytest.raises(StopIteration):
next(stream)
- def test_empty_stream(self):
+ def test_empty_stream(self) -> None:
fp = BytesIO(b"")
resp = HTTPResponse(fp, preload_content=False)
stream = resp.stream(2, decode_content=False)
@@ -584,19 +588,19 @@ class TestResponse:
with pytest.raises(StopIteration):
next(stream)
- def test_length_no_header(self):
+ def test_length_no_header(self) -> None:
fp = BytesIO(b"12345")
resp = HTTPResponse(fp, preload_content=False)
assert resp.length_remaining is None
- def test_length_w_valid_header(self):
+ def test_length_w_valid_header(self) -> None:
headers = {"content-length": "5"}
fp = BytesIO(b"12345")
resp = HTTPResponse(fp, headers=headers, preload_content=False)
assert resp.length_remaining == 5
- def test_length_w_bad_header(self):
+ def test_length_w_bad_header(self) -> None:
garbage = {"content-length": "foo"}
fp = BytesIO(b"12345")
@@ -607,7 +611,7 @@ class TestResponse:
resp = HTTPResponse(fp, headers=garbage, preload_content=False)
assert resp.length_remaining is None
- def test_length_when_chunked(self):
+ def test_length_when_chunked(self) -> None:
# This is expressly forbidden in RFC 7230 sec 3.3.2
# We fall back to chunked in this case and try to
# handle response ignoring content length.
@@ -617,7 +621,7 @@ class TestResponse:
resp = HTTPResponse(fp, headers=headers, preload_content=False)
assert resp.length_remaining is None
- def test_length_with_multiple_content_lengths(self):
+ def test_length_with_multiple_content_lengths(self) -> None:
headers = {"content-length": "5, 5, 5"}
garbage = {"content-length": "5, 42"}
fp = BytesIO(b"abcde")
@@ -628,7 +632,7 @@ class TestResponse:
with pytest.raises(InvalidHeader):
HTTPResponse(fp, headers=garbage, preload_content=False)
- def test_length_after_read(self):
+ def test_length_after_read(self) -> None:
headers = {"content-length": "5"}
# Test no defined length
@@ -650,27 +654,29 @@ class TestResponse:
next(data)
assert resp.length_remaining == 3
- def test_mock_httpresponse_stream(self):
+ def test_mock_httpresponse_stream(self) -> None:
# Mock out a HTTP Request that does enough to make it through urllib3's
# read() and close() calls, and also exhausts and underlying file
# object.
class MockHTTPRequest:
- self.fp = None
+ def __init__(self) -> None:
+ self.fp: Optional[BytesIO] = None
- def read(self, amt):
+ def read(self, amt: int) -> bytes:
+ assert self.fp is not None
data = self.fp.read(amt)
if not data:
self.fp = None
return data
- def close(self):
+ def close(self) -> None:
self.fp = None
bio = BytesIO(b"foo")
fp = MockHTTPRequest()
fp.fp = bio
- resp = HTTPResponse(fp, preload_content=False)
+ resp = HTTPResponse(fp, preload_content=False) # type: ignore[arg-type]
stream = resp.stream(2)
assert next(stream) == b"fo"
@@ -678,11 +684,11 @@ class TestResponse:
with pytest.raises(StopIteration):
next(stream)
- def test_mock_transfer_encoding_chunked(self):
+ def test_mock_transfer_encoding_chunked(self) -> None:
stream = [b"fo", b"o", b"bar"]
fp = MockChunkedEncodingResponse(stream)
- r = httplib.HTTPResponse(MockSock)
- r.fp = fp
+ r = httplib.HTTPResponse(MockSock) # type: ignore[arg-type]
+ r.fp = fp # type: ignore[attr-defined]
resp = HTTPResponse(
r, preload_content=False, headers={"transfer-encoding": "chunked"}
)
@@ -690,10 +696,10 @@ class TestResponse:
for i, c in enumerate(resp.stream()):
assert c == stream[i]
- def test_mock_gzipped_transfer_encoding_chunked_decoded(self):
+ def test_mock_gzipped_transfer_encoding_chunked_decoded(self) -> None:
"""Show that we can decode the gzipped and chunked body."""
- def stream():
+ def stream() -> Generator[bytes, None, None]:
# Set up a generator to chunk the gzipped body
compress = zlib.compressobj(6, zlib.DEFLATED, 16 + zlib.MAX_WBITS)
data = compress.compress(b"foobar")
@@ -702,8 +708,8 @@ class TestResponse:
yield data[i : i + 2]
fp = MockChunkedEncodingResponse(list(stream()))
- r = httplib.HTTPResponse(MockSock)
- r.fp = fp
+ r = httplib.HTTPResponse(MockSock) # type: ignore[arg-type]
+ r.fp = fp # type: ignore[attr-defined]
headers = {"transfer-encoding": "chunked", "content-encoding": "gzip"}
resp = HTTPResponse(r, preload_content=False, headers=headers)
@@ -713,13 +719,13 @@ class TestResponse:
assert b"foobar" == data
- def test_mock_transfer_encoding_chunked_custom_read(self):
+ def test_mock_transfer_encoding_chunked_custom_read(self) -> None:
stream = [b"foooo", b"bbbbaaaaar"]
fp = MockChunkedEncodingResponse(stream)
- r = httplib.HTTPResponse(MockSock)
- r.fp = fp
- r.chunked = True
- r.chunk_left = None
+ r = httplib.HTTPResponse(MockSock) # type: ignore[arg-type]
+ r.fp = fp # type: ignore[attr-defined]
+ r.chunked = True # type: ignore[attr-defined]
+ r.chunk_left = None # type: ignore[attr-defined]
resp = HTTPResponse(
r, preload_content=False, headers={"transfer-encoding": "chunked"}
)
@@ -727,26 +733,26 @@ class TestResponse:
response = list(resp.read_chunked(2))
assert expected_response == response
- def test_mock_transfer_encoding_chunked_unlmtd_read(self):
+ def test_mock_transfer_encoding_chunked_unlmtd_read(self) -> None:
stream = [b"foooo", b"bbbbaaaaar"]
fp = MockChunkedEncodingResponse(stream)
- r = httplib.HTTPResponse(MockSock)
- r.fp = fp
- r.chunked = True
- r.chunk_left = None
+ r = httplib.HTTPResponse(MockSock) # type: ignore[arg-type]
+ r.fp = fp # type: ignore[attr-defined]
+ r.chunked = True # type: ignore[attr-defined]
+ r.chunk_left = None # type: ignore[attr-defined]
resp = HTTPResponse(
r, preload_content=False, headers={"transfer-encoding": "chunked"}
)
assert stream == list(resp.read_chunked())
- def test_read_not_chunked_response_as_chunks(self):
+ def test_read_not_chunked_response_as_chunks(self) -> None:
fp = BytesIO(b"foo")
resp = HTTPResponse(fp, preload_content=False)
r = resp.read_chunked()
with pytest.raises(ResponseNotChunked):
next(r)
- def test_read_chunked_not_supported(self):
+ def test_read_chunked_not_supported(self) -> None:
fp = BytesIO(b"foo")
resp = HTTPResponse(
fp, preload_content=False, headers={"transfer-encoding": "chunked"}
@@ -755,7 +761,7 @@ class TestResponse:
with pytest.raises(BodyNotHttplibCompatible):
next(r)
- def test_buggy_incomplete_read(self):
+ def test_buggy_incomplete_read(self) -> None:
# Simulate buggy versions of Python (<2.7.4)
# See http://bugs.python.org/issue16298
content_length = 1337
@@ -774,13 +780,13 @@ class TestResponse:
assert orig_ex.partial == 0
assert orig_ex.expected == content_length
- def test_incomplete_chunk(self):
+ def test_incomplete_chunk(self) -> None:
stream = [b"foooo", b"bbbbaaaaar"]
fp = MockChunkedIncompleteRead(stream)
- r = httplib.HTTPResponse(MockSock)
- r.fp = fp
- r.chunked = True
- r.chunk_left = None
+ r = httplib.HTTPResponse(MockSock) # type: ignore[arg-type]
+ r.fp = fp # type: ignore[attr-defined]
+ r.chunked = True # type: ignore[attr-defined]
+ r.chunk_left = None # type: ignore[attr-defined]
resp = HTTPResponse(
r, preload_content=False, headers={"transfer-encoding": "chunked"}
)
@@ -790,13 +796,13 @@ class TestResponse:
orig_ex = ctx.value.args[1]
assert isinstance(orig_ex, httplib_IncompleteRead)
- def test_invalid_chunk_length(self):
+ def test_invalid_chunk_length(self) -> None:
stream = [b"foooo", b"bbbbaaaaar"]
fp = MockChunkedInvalidChunkLength(stream)
- r = httplib.HTTPResponse(MockSock)
- r.fp = fp
- r.chunked = True
- r.chunk_left = None
+ r = httplib.HTTPResponse(MockSock) # type: ignore[arg-type]
+ r.fp = fp # type: ignore[attr-defined]
+ r.chunked = True # type: ignore[attr-defined]
+ r.chunk_left = None # type: ignore[attr-defined]
resp = HTTPResponse(
r, preload_content=False, headers={"transfer-encoding": "chunked"}
)
@@ -804,37 +810,42 @@ class TestResponse:
next(resp.read_chunked())
orig_ex = ctx.value.args[1]
+ msg = (
+ "(\"Connection broken: InvalidChunkLength(got length b'ZZZ\\\\r\\\\n', 0 bytes read)\", "
+ "InvalidChunkLength(got length b'ZZZ\\r\\n', 0 bytes read))"
+ )
+ assert str(ctx.value) == msg
assert isinstance(orig_ex, InvalidChunkLength)
assert orig_ex.length == fp.BAD_LENGTH_LINE.encode()
- def test_chunked_response_without_crlf_on_end(self):
+ def test_chunked_response_without_crlf_on_end(self) -> None:
stream = [b"foo", b"bar", b"baz"]
fp = MockChunkedEncodingWithoutCRLFOnEnd(stream)
- r = httplib.HTTPResponse(MockSock)
- r.fp = fp
- r.chunked = True
- r.chunk_left = None
+ r = httplib.HTTPResponse(MockSock) # type: ignore[arg-type]
+ r.fp = fp # type: ignore[attr-defined]
+ r.chunked = True # type: ignore[attr-defined]
+ r.chunk_left = None # type: ignore[attr-defined]
resp = HTTPResponse(
r, preload_content=False, headers={"transfer-encoding": "chunked"}
)
assert stream == list(resp.stream())
- def test_chunked_response_with_extensions(self):
+ def test_chunked_response_with_extensions(self) -> None:
stream = [b"foo", b"bar"]
fp = MockChunkedEncodingWithExtensions(stream)
- r = httplib.HTTPResponse(MockSock)
- r.fp = fp
- r.chunked = True
- r.chunk_left = None
+ r = httplib.HTTPResponse(MockSock) # type: ignore[arg-type]
+ r.fp = fp # type: ignore[attr-defined]
+ r.chunked = True # type: ignore[attr-defined]
+ r.chunk_left = None # type: ignore[attr-defined]
resp = HTTPResponse(
r, preload_content=False, headers={"transfer-encoding": "chunked"}
)
assert stream == list(resp.stream())
- def test_chunked_head_response(self):
- r = httplib.HTTPResponse(MockSock, method="HEAD")
- r.chunked = True
- r.chunk_left = None
+ def test_chunked_head_response(self) -> None:
+ r = httplib.HTTPResponse(MockSock, method="HEAD") # type: ignore[arg-type]
+ r.chunked = True # type: ignore[attr-defined]
+ r.chunk_left = None # type: ignore[attr-defined]
resp = HTTPResponse(
"",
preload_content=False,
@@ -843,19 +854,19 @@ class TestResponse:
)
assert resp.chunked is True
- resp.supports_chunked_reads = lambda: True
- resp.release_conn = mock.Mock()
+ setattr(resp, "supports_chunked_reads", lambda: True)
+ setattr(resp, "release_conn", mock.Mock())
for _ in resp.stream():
continue
- resp.release_conn.assert_called_once_with()
+ resp.release_conn.assert_called_once_with() # type: ignore[attr-defined]
- def test_get_case_insensitive_headers(self):
+ def test_get_case_insensitive_headers(self) -> None:
headers = {"host": "example.com"}
r = HTTPResponse(headers=headers)
assert r.headers.get("host") == "example.com"
assert r.headers.get("Host") == "example.com"
- def test_retries(self):
+ def test_retries(self) -> None:
fp = BytesIO(b"")
resp = HTTPResponse(fp)
assert resp.retries is None
@@ -863,13 +874,13 @@ class TestResponse:
resp = HTTPResponse(fp, retries=retry)
assert resp.retries == retry
- def test_geturl(self):
+ def test_geturl(self) -> None:
fp = BytesIO(b"")
request_url = "https://example.com"
resp = HTTPResponse(fp, request_url=request_url)
assert resp.geturl() == request_url
- def test_url(self):
+ def test_url(self) -> None:
fp = BytesIO(b"")
request_url = "https://example.com"
resp = HTTPResponse(fp, request_url=request_url)
@@ -877,10 +888,10 @@ class TestResponse:
resp.url = "https://anotherurl.com"
assert resp.url == "https://anotherurl.com"
- def test_geturl_retries(self):
+ def test_geturl_retries(self) -> None:
fp = BytesIO(b"")
resp = HTTPResponse(fp, request_url="http://example.com")
- request_histories = [
+ request_histories = (
RequestHistory(
method="GET",
url="http://example.com",
@@ -895,7 +906,7 @@ class TestResponse:
status=301,
redirect_location="https://www.example.com",
),
- ]
+ )
retry = Retry(history=request_histories)
resp = HTTPResponse(fp, retries=retry)
assert resp.geturl() == "https://www.example.com"
@@ -910,15 +921,15 @@ class TestResponse:
(b"Hello\nworld\n\n\n!", [b"Hello\n", b"world\n", b"\n", b"\n", b"!"]),
],
)
- def test__iter__(self, payload, expected_stream):
+ def test__iter__(self, payload: bytes, expected_stream: List[bytes]) -> None:
actual_stream = []
for chunk in HTTPResponse(BytesIO(payload), preload_content=False):
actual_stream.append(chunk)
assert actual_stream == expected_stream
- def test__iter__decode_content(self):
- def stream():
+ def test__iter__decode_content(self) -> None:
+ def stream() -> Generator[bytes, None, None]:
# Set up a generator to chunk the gzipped body
compress = zlib.compressobj(6, zlib.DEFLATED, 16 + zlib.MAX_WBITS)
data = compress.compress(b"foo\nbar")
@@ -927,8 +938,8 @@ class TestResponse:
yield data[i : i + 2]
fp = MockChunkedEncodingResponse(list(stream()))
- r = httplib.HTTPResponse(MockSock)
- r.fp = fp
+ r = httplib.HTTPResponse(MockSock) # type: ignore[arg-type]
+ r.fp = fp # type: ignore[attr-defined]
headers = {"transfer-encoding": "chunked", "content-encoding": "gzip"}
resp = HTTPResponse(r, preload_content=False, headers=headers)
@@ -938,13 +949,13 @@ class TestResponse:
assert b"foo\nbar" == data
- def test_non_timeout_ssl_error_on_read(self):
+ def test_non_timeout_ssl_error_on_read(self) -> None:
mac_error = ssl.SSLError(
"SSL routines", "ssl3_get_record", "decryption failed or bad record mac"
)
@contextlib.contextmanager
- def make_bad_mac_fp():
+ def make_bad_mac_fp() -> Generator[BytesIO, None, None]:
fp = BytesIO(b"")
with mock.patch.object(fp, "read") as fp_read:
# mac/decryption error
@@ -964,7 +975,7 @@ class TestResponse:
class MockChunkedEncodingResponse:
- def __init__(self, content):
+ def __init__(self, content: List[bytes]) -> None:
"""
content: collection of str, each str is a chunk in response
"""
@@ -974,13 +985,12 @@ class MockChunkedEncodingResponse:
self.cur_chunk = b""
self.chunks_exhausted = False
- @staticmethod
- def _encode_chunk(chunk):
+ def _encode_chunk(self, chunk: bytes) -> bytes:
# In the general case, we can't decode the chunk to unicode
length = f"{len(chunk):X}\r\n"
return length.encode() + chunk + b"\r\n"
- def _pop_new_chunk(self):
+ def _pop_new_chunk(self) -> bytes:
if self.chunks_exhausted:
return b""
try:
@@ -993,9 +1003,10 @@ class MockChunkedEncodingResponse:
chunk = self._encode_chunk(chunk)
if not isinstance(chunk, bytes):
chunk = chunk.encode()
+ assert isinstance(chunk, bytes)
return chunk
- def pop_current_chunk(self, amt=-1, till_crlf=False):
+ def pop_current_chunk(self, amt: int = -1, till_crlf: bool = False) -> bytes:
if amt > 0 and till_crlf:
raise ValueError("Can't specify amt and till_crlf.")
if len(self.cur_chunk) <= 0:
@@ -1025,47 +1036,47 @@ class MockChunkedEncodingResponse:
self.cur_chunk = self.cur_chunk[amt:]
return chunk_part
- def readline(self):
+ def readline(self) -> bytes:
return self.pop_current_chunk(till_crlf=True)
- def read(self, amt=-1):
+ def read(self, amt: int = -1) -> bytes:
return self.pop_current_chunk(amt)
- def flush(self):
+ def flush(self) -> None:
# Python 3 wants this method.
pass
- def close(self):
+ def close(self) -> None:
self.closed = True
class MockChunkedIncompleteRead(MockChunkedEncodingResponse):
- def _encode_chunk(self, chunk):
- return f"9999\r\n{chunk.decode()}\r\n"
+ def _encode_chunk(self, chunk: bytes) -> bytes:
+ return f"9999\r\n{chunk.decode()}\r\n".encode()
class MockChunkedInvalidChunkLength(MockChunkedEncodingResponse):
BAD_LENGTH_LINE = "ZZZ\r\n"
- def _encode_chunk(self, chunk):
- return f"{self.BAD_LENGTH_LINE}{chunk.decode()}\r\n"
+ def _encode_chunk(self, chunk: bytes) -> bytes:
+ return f"{self.BAD_LENGTH_LINE}{chunk.decode()}\r\n".encode()
class MockChunkedEncodingWithoutCRLFOnEnd(MockChunkedEncodingResponse):
- def _encode_chunk(self, chunk):
+ def _encode_chunk(self, chunk: bytes) -> bytes:
return "{:X}\r\n{}{}".format(
len(chunk),
chunk.decode(),
"\r\n" if len(chunk) > 0 else "",
- )
+ ).encode()
class MockChunkedEncodingWithExtensions(MockChunkedEncodingResponse):
- def _encode_chunk(self, chunk):
- return f"{len(chunk):X};asd=qwe\r\n{chunk.decode()}\r\n"
+ def _encode_chunk(self, chunk: bytes) -> bytes:
+ return f"{len(chunk):X};asd=qwe\r\n{chunk.decode()}\r\n".encode()
class MockSock:
@classmethod
- def makefile(cls, *args, **kwargs):
+ def makefile(cls, *args: Any, **kwargs: Any) -> None:
return