summaryrefslogtreecommitdiff
path: root/test/with_dummyserver/test_socketlevel.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/with_dummyserver/test_socketlevel.py')
-rw-r--r--test/with_dummyserver/test_socketlevel.py48
1 files changed, 48 insertions, 0 deletions
diff --git a/test/with_dummyserver/test_socketlevel.py b/test/with_dummyserver/test_socketlevel.py
index db481d30..6c996538 100644
--- a/test/with_dummyserver/test_socketlevel.py
+++ b/test/with_dummyserver/test_socketlevel.py
@@ -18,6 +18,8 @@ from dummyserver.testcase import SocketDummyServerTestCase
from dummyserver.server import (
DEFAULT_CERTS, DEFAULT_CA, get_unreachable_address)
+from .. import onlyPy3
+
from nose.plugins.skip import SkipTest
from threading import Event
import socket
@@ -598,3 +600,49 @@ class TestErrorWrapping(SocketDummyServerTestCase):
self._start_server(handler)
pool = HTTPConnectionPool(self.host, self.port, retries=False)
self.assertRaises(ProtocolError, pool.request, 'GET', '/')
+
+class TestHeaders(SocketDummyServerTestCase):
+
+ @onlyPy3
+ def test_httplib_headers_case_insensitive(self):
+ handler = create_response_handler(
+ b'HTTP/1.1 200 OK\r\n'
+ b'Content-Length: 0\r\n'
+ b'Content-type: text/plain\r\n'
+ b'\r\n'
+ )
+ self._start_server(handler)
+ pool = HTTPConnectionPool(self.host, self.port, retries=False)
+ HEADERS = {'Content-Length': '0', 'Content-type': 'text/plain'}
+ r = pool.request('GET', '/')
+ self.assertEqual(HEADERS, dict(r.headers.items())) # to preserve case sensitivity
+
+
+class TestHEAD(SocketDummyServerTestCase):
+ def test_chunked_head_response_does_not_hang(self):
+ handler = create_response_handler(
+ b'HTTP/1.1 200 OK\r\n'
+ b'Transfer-Encoding: chunked\r\n'
+ b'Content-type: text/plain\r\n'
+ b'\r\n'
+ )
+ self._start_server(handler)
+ pool = HTTPConnectionPool(self.host, self.port, retries=False)
+ r = pool.request('HEAD', '/', timeout=1, preload_content=False)
+
+ # stream will use the read_chunked method here.
+ self.assertEqual([], list(r.stream()))
+
+ def test_empty_head_response_does_not_hang(self):
+ handler = create_response_handler(
+ b'HTTP/1.1 200 OK\r\n'
+ b'Content-Length: 256\r\n'
+ b'Content-type: text/plain\r\n'
+ b'\r\n'
+ )
+ self._start_server(handler)
+ pool = HTTPConnectionPool(self.host, self.port, retries=False)
+ r = pool.request('HEAD', '/', timeout=1, preload_content=False)
+
+ # stream will use the read method here.
+ self.assertEqual([], list(r.stream()))