summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCory Benfield <lukasaoz@gmail.com>2017-05-30 10:15:51 +0100
committerGitHub <noreply@github.com>2017-05-30 10:15:51 +0100
commitee227ab53cfd590b725dfe7b90ddc80a8426bca5 (patch)
tree332cc96a26f8d384b68ddc9d739d95fc200e9fb8
parent20d0e8681c6dfbc0f7590a949feb3e692fa32275 (diff)
parentda8368a3f2d53fcec136806349e2631dc2c363c7 (diff)
downloadurllib3-v2.tar.gz
Merge pull request #1173 from Lukasa/rebuild-sync-connection-loopv2
Rebuild sync connection loop.
-rw-r--r--test/contrib/test_securetransport.py2
-rw-r--r--test/test_sync_connection.py434
-rw-r--r--urllib3/sync_connection.py304
3 files changed, 645 insertions, 95 deletions
diff --git a/test/contrib/test_securetransport.py b/test/contrib/test_securetransport.py
index 96fc57ea..390872b9 100644
--- a/test/contrib/test_securetransport.py
+++ b/test/contrib/test_securetransport.py
@@ -7,6 +7,8 @@ try:
except ImportError as e:
raise SkipTest('Could not import SecureTransport: %r' % e)
+raise SkipTest('SecureTransport currently not supported in v2!')
+
from ..with_dummyserver.test_https import TestHTTPS, TestHTTPS_TLSv1 # noqa: F401
from ..with_dummyserver.test_socketlevel import ( # noqa: F401
TestSNI, TestSocketClosing, TestClientCerts
diff --git a/test/test_sync_connection.py b/test/test_sync_connection.py
new file mode 100644
index 00000000..b5680a74
--- /dev/null
+++ b/test/test_sync_connection.py
@@ -0,0 +1,434 @@
+"""
+Low-level synchronous connection tests.
+
+These tests involve mocking out the network layer to cause specific unusual
+behaviours to occur. The goal is to ensure that the synchronous connection
+layer can handle unexpected network weather without falling over, and without
+expending undue effort to arrange that these effects actually happen on a real
+network.
+"""
+import collections
+import errno
+import socket
+import ssl
+import unittest
+
+import h11
+
+from urllib3.base import Request
+from urllib3.sync_connection import SyncHTTP1Connection
+from urllib3.util import selectors
+
+
+# Objects and globals for handling scenarios.
+Event = collections.namedtuple('Event', ['expected_object', 'event', 'meta'])
+
+SELECTOR = "SELECTOR"
+SOCKET = "SOCKET"
+RAISE_EAGAIN = "RAISE_EAGAIN"
+RAISE_WANT_READ = "RAISE_WANT_READ"
+RAISE_WANT_WRITE = "RAISE_WANT_WRITE"
+
+EVENT_SELECT = "EVENT_SELECT"
+
+EVENT_SEND = "EVENT_SEND"
+SEND_ALL = "SEND_ALL"
+
+EVENT_RECV = "EVENT_RECV"
+RECV_ALL = "RECV_ALL"
+
+
+# A number of helpful shorthands for common events.
+SELECT_UPLOAD_WRITE = Event(
+ SELECTOR,
+ EVENT_SELECT,
+ (selectors.EVENT_READ | selectors.EVENT_WRITE, selectors.EVENT_WRITE)
+)
+SELECT_UPLOAD_READ = Event(
+ SELECTOR,
+ EVENT_SELECT,
+ (selectors.EVENT_READ | selectors.EVENT_WRITE, selectors.EVENT_READ)
+)
+SELECT_DOWNLOAD_READ = Event(
+ SELECTOR, EVENT_SELECT, (selectors.EVENT_READ, selectors.EVENT_READ)
+)
+SELECT_DOWNLOAD_WRITE = Event(
+ SELECTOR, EVENT_SELECT, (selectors.EVENT_READ, selectors.EVENT_READ)
+)
+SELECT_WRITABLE_WRITE = Event(
+ SELECTOR, EVENT_SELECT, (selectors.EVENT_WRITE, selectors.EVENT_WRITE)
+)
+SOCKET_SEND_ALL = Event(SOCKET, EVENT_SEND, (SEND_ALL,))
+SOCKET_SEND_5 = Event(SOCKET, EVENT_SEND, (5,))
+SOCKET_SEND_EAGAIN = Event(SOCKET, EVENT_SEND, (RAISE_EAGAIN,))
+SOCKET_SEND_WANTREAD = Event(SOCKET, EVENT_SEND, (RAISE_WANT_READ,))
+SOCKET_SEND_WANTWRITE = Event(SOCKET, EVENT_SEND, (RAISE_WANT_WRITE,))
+SOCKET_RECV_ALL = Event(SOCKET, EVENT_RECV, (RECV_ALL,))
+SOCKET_RECV_5 = Event(SOCKET, EVENT_RECV, (5,))
+SOCKET_RECV_EAGAIN = Event(SOCKET, EVENT_RECV, (RAISE_EAGAIN,))
+SOCKET_RECV_WANTREAD = Event(SOCKET, EVENT_RECV, (RAISE_WANT_READ,))
+SOCKET_RECV_WANTWRITE = Event(SOCKET, EVENT_RECV, (RAISE_WANT_WRITE,))
+
+
+REQUEST = (
+ b'GET / HTTP/1.1\r\n'
+ b'host: localhost\r\n'
+ b'\r\n'
+)
+RESPONSE = (
+ b'HTTP/1.1 200 OK\r\n'
+ b'Server: totallyarealserver/1.0.0\r\n'
+ b'Content-Length: 8\r\n'
+ b'Content-Type: text/plain\r\n'
+ b'\r\n'
+ b'complete'
+)
+
+
+class ScenarioError(Exception):
+ """
+ An error occurred with running the scenario.
+ """
+ pass
+
+
+class ScenarioSelector(object):
+ """
+ A mock Selector object. This selector implements a tiny bit of the selector
+ API (only that which is used by the higher layers), and response to select
+ based on the scenario it is provided.
+ """
+ def __init__(self, scenario, sock):
+ self._scenario = scenario
+ self._fd = sock
+ self._events = None
+
+ def register(self, fd, events):
+ if fd is not self._fd:
+ raise ScenarioError("Registered unexpected socket!")
+ self._events = events
+
+ def modify(self, fd, events):
+ if fd is not self._fd:
+ raise ScenarioError("Modifying unexpected socket!")
+ self._events = events
+
+ def select(self, timeout=None):
+ expected_object, event, args = self._scenario.pop(0)
+ if expected_object is not SELECTOR:
+ raise ScenarioError("Received non selector event!")
+
+ if event is not EVENT_SELECT:
+ raise ScenarioError("Expected EVENT_SELECT, got %s" % event)
+
+ expected_events, returned_event = args
+ if self._events != expected_events:
+ raise ScenarioError(
+ "Expected events %s, got %s" % (self._events, expected_events)
+ )
+
+ key = self.get_key(self._fd)
+ return [(key, returned_event)]
+
+ def get_key(self, fd):
+ if fd is not self._fd:
+ raise ScenarioError("Querying unexpected socket!")
+ return selectors.SelectorKey(
+ self._fd,
+ 1,
+ self._events,
+ None
+ )
+
+ def close(self):
+ pass
+
+
+class ScenarioSocket(object):
+ """
+ A mock Socket object. This object implements a tiny bit of the socket API
+ (only that which is used by the synchronous connection), and responds to
+ socket calls based on the scenario it is provided.
+ """
+ def __init__(self, scenario):
+ self._scenario = scenario
+ self._data_to_send = RESPONSE
+ self._data_sent = b''
+ self._closed = False
+
+ def _raise_errors(self, possible_error):
+ if possible_error is RAISE_EAGAIN:
+ raise socket.error(errno.EAGAIN, "try again later")
+ elif possible_error is RAISE_WANT_READ:
+ raise ssl.SSLWantReadError("Want read")
+ elif possible_error is RAISE_WANT_WRITE:
+ raise ssl.SSLWantWriteError("Want write")
+
+ def send(self, data):
+ expected_object, event, args = self._scenario.pop(0)
+ if expected_object is not SOCKET:
+ raise ScenarioError("Received non selector event!")
+
+ if event is not EVENT_SEND:
+ raise ScenarioError("Expected EVENT_SEND, got %s" % event)
+
+ amount, = args
+ self._raise_errors(amount)
+ if amount is SEND_ALL:
+ amount = len(data)
+
+ self._data_sent += data[:amount].tobytes()
+ return amount
+
+ def recv(self, amt):
+ expected_object, event, args = self._scenario.pop(0)
+ if expected_object is not SOCKET:
+ raise ScenarioError("Received non selector event!")
+
+ if event is not EVENT_RECV:
+ raise ScenarioError("Expected EVENT_RECV, got %s" % event)
+
+ amount, = args
+ self._raise_errors(amount)
+ if amount is RECV_ALL:
+ amount = min(len(RESPONSE), amt)
+
+ rdata = self._data_to_send[:amount]
+ self._data_to_send = self._data_to_send[amount:]
+ return rdata
+
+ def setblocking(self, *args):
+ pass
+
+ def close(self):
+ self._closed = True
+
+
+class TestUnusualSocketConditions(unittest.TestCase):
+ """
+ This class contains tests that take strict control over sockets and
+ selectors. The goal here is to simulate unusual network conditions that are
+ extremely difficult to reproducibly simulate even with socketlevel tests in
+ which we control both ends of the connection. For example, these tests
+ will trigger WANT_READ and WANT_WRITE errors in TLS stacks which are
+ otherwise extremely hard to trigger, and will also fire EAGAIN on sockets
+ marked readable/writable, which can technically happen but are extremely
+ tricky to trigger by using actual sockets and the loopback interface.
+
+ These tests are necessarily not a perfect replacement for actual realworld
+ examples, but those are so prohibitively difficult to trigger that these
+ will have to do instead.
+ """
+ # A stub value of the read timeout that will be used by the selector.
+ # This should not be edited by tests: only used as a reference for what
+ # delay values they can use to force things to time out.
+ READ_TIMEOUT = 5
+
+ def run_scenario(self, scenario):
+ conn = SyncHTTP1Connection('localhost', 80)
+ conn._state_machine = h11.Connection(our_role=h11.CLIENT)
+ conn._sock = sock = ScenarioSocket(scenario)
+ conn._selector = ScenarioSelector(scenario, sock)
+
+ request = Request(method=b'GET', target=b'/')
+ request.add_host(host=b'localhost', port=80, scheme='http')
+ response = conn.send_request(request, read_timeout=self.READ_TIMEOUT)
+ body = b''.join(response.body)
+
+ # The scenario should be totally consumed.
+ self.assertFalse(scenario)
+
+ # Validate that the response is complete.
+ self.assertEqual(response.status_code, 200)
+ self.assertEqual(body, b'complete')
+ self.assertEqual(response.version, b'HTTP/1.1')
+ self.assertEqual(len(response.headers), 3)
+ self.assertEqual(response.headers['server'], 'totallyarealserver/1.0.0')
+ self.assertEqual(response.headers['content-length'], '8')
+ self.assertEqual(response.headers['content-type'], 'text/plain')
+
+ return sock
+
+ def test_happy_path(self):
+ """
+ When everything goes smoothly, the response is cleanly consumed.
+ """
+ scenario = [
+ SELECT_UPLOAD_WRITE,
+ SOCKET_SEND_ALL,
+ SELECT_DOWNLOAD_READ,
+ SOCKET_RECV_ALL,
+ ]
+ sock = self.run_scenario(scenario)
+ self.assertEqual(sock._data_sent, REQUEST)
+
+ def test_handle_recv_eagain_download(self):
+ """
+ When a socket is marked readable during response body download but
+ returns EAGAIN when read from, the code simply retries the read.
+ """
+ scenario = [
+ SELECT_UPLOAD_WRITE,
+ SOCKET_SEND_ALL,
+ SELECT_DOWNLOAD_READ,
+ SOCKET_RECV_EAGAIN,
+ SELECT_DOWNLOAD_READ,
+ SOCKET_RECV_EAGAIN,
+ SELECT_DOWNLOAD_READ,
+ SOCKET_RECV_ALL,
+ ]
+ sock = self.run_scenario(scenario)
+ self.assertEqual(sock._data_sent, REQUEST)
+
+ def test_handle_recv_want_read_download(self):
+ """
+ When a socket is marked readable during response body download but
+ returns SSL_WANT_READ when read from, the code simply retries the read.
+ """
+ scenario = [
+ SELECT_UPLOAD_WRITE,
+ SOCKET_SEND_ALL,
+ SELECT_DOWNLOAD_READ,
+ SOCKET_RECV_WANTREAD,
+ SELECT_DOWNLOAD_READ,
+ SOCKET_RECV_WANTREAD,
+ SELECT_DOWNLOAD_READ,
+ SOCKET_RECV_ALL,
+ ]
+ sock = self.run_scenario(scenario)
+ self.assertEqual(sock._data_sent, REQUEST)
+
+ def test_handle_recv_eagain_upload(self):
+ """
+ When a socket is marked readable during request upload but returns
+ EAGAIN when read from, the code ignores it and continues with upload.
+ """
+ scenario = [
+ SELECT_UPLOAD_WRITE,
+ SOCKET_SEND_5,
+ SELECT_UPLOAD_READ,
+ SOCKET_RECV_EAGAIN,
+ SELECT_UPLOAD_WRITE,
+ SOCKET_SEND_ALL,
+ SELECT_DOWNLOAD_READ,
+ SOCKET_RECV_ALL,
+ ]
+ sock = self.run_scenario(scenario)
+ self.assertEqual(sock._data_sent, REQUEST)
+
+ def test_handle_recv_wantread_upload(self):
+ """
+ When a socket is marked readable during request upload but returns
+ WANT_READ when read from, the code ignores it and continues with upload.
+ """
+ scenario = [
+ SELECT_UPLOAD_WRITE,
+ SOCKET_SEND_5,
+ SELECT_UPLOAD_READ,
+ SOCKET_RECV_WANTREAD,
+ SELECT_UPLOAD_WRITE,
+ SOCKET_SEND_ALL,
+ SELECT_DOWNLOAD_READ,
+ SOCKET_RECV_ALL,
+ ]
+ sock = self.run_scenario(scenario)
+ self.assertEqual(sock._data_sent, REQUEST)
+
+ def test_handle_send_eagain_upload(self):
+ """
+ When a socket is marked writable during request upload but returns
+ EAGAIN when written to, the code ignores it and continues with upload.
+ """
+ scenario = [
+ SELECT_UPLOAD_WRITE,
+ SOCKET_SEND_5,
+ SELECT_UPLOAD_WRITE,
+ SOCKET_SEND_EAGAIN,
+ SELECT_UPLOAD_WRITE,
+ SOCKET_SEND_ALL,
+ SELECT_DOWNLOAD_READ,
+ SOCKET_RECV_ALL,
+ ]
+ sock = self.run_scenario(scenario)
+ self.assertEqual(sock._data_sent, REQUEST)
+
+ def test_handle_send_wantwrite_upload(self):
+ """
+ When a socket is marked writable during request upload but returns
+ WANT_WRITE when written to, the code ignores it and continues with
+ upload.
+ """
+ scenario = [
+ SELECT_UPLOAD_WRITE,
+ SOCKET_SEND_5,
+ SELECT_UPLOAD_WRITE,
+ SOCKET_SEND_WANTWRITE,
+ SELECT_UPLOAD_WRITE,
+ SOCKET_SEND_ALL,
+ SELECT_DOWNLOAD_READ,
+ SOCKET_RECV_ALL,
+ ]
+ sock = self.run_scenario(scenario)
+ self.assertEqual(sock._data_sent, REQUEST)
+
+ def test_handle_early_response(self):
+ """
+ When a socket is marked readable during request upload, and any data is
+ read from the socket, the upload immediately stops and the response is
+ read.
+ """
+ scenario = [
+ SELECT_UPLOAD_WRITE,
+ SOCKET_SEND_5,
+ SELECT_UPLOAD_READ,
+ SOCKET_RECV_5,
+ SELECT_DOWNLOAD_READ,
+ SOCKET_RECV_ALL,
+ ]
+ sock = self.run_scenario(scenario)
+ self.assertEqual(sock._data_sent, REQUEST[:5])
+ self.assertTrue(sock._closed)
+
+ def test_handle_want_read_during_upload(self):
+ """
+ When a socket is marked writable during request upload but returns
+ WANT_READ when written to, the code waits for the socket to become
+ readable and issues the write again.
+ """
+ scenario = [
+ SELECT_UPLOAD_WRITE,
+ SOCKET_SEND_5,
+ # Return WANT_READ twice for good measure.
+ SELECT_UPLOAD_WRITE,
+ SOCKET_SEND_WANTREAD,
+ SELECT_DOWNLOAD_READ,
+ SOCKET_SEND_WANTREAD,
+ SELECT_DOWNLOAD_READ,
+ SOCKET_SEND_ALL,
+ SELECT_DOWNLOAD_READ,
+ SOCKET_RECV_ALL,
+ ]
+ sock = self.run_scenario(scenario)
+ self.assertEqual(sock._data_sent, REQUEST)
+
+ def test_handle_want_write_during_download(self):
+ """
+ When a socket is marked readable during response download but returns
+ WANT_WRITE when read from, the code waits for the socket to become
+ writable and issues the read again.
+ """
+ scenario = [
+ SELECT_UPLOAD_WRITE,
+ SOCKET_SEND_ALL,
+ # Return WANT_WRITE twice for good measure.
+ SELECT_DOWNLOAD_READ,
+ SOCKET_RECV_WANTWRITE,
+ SELECT_WRITABLE_WRITE,
+ SOCKET_RECV_WANTWRITE,
+ SELECT_WRITABLE_WRITE,
+ SOCKET_RECV_5,
+ SELECT_DOWNLOAD_READ,
+ SOCKET_RECV_ALL,
+ ]
+ sock = self.run_scenario(scenario)
+ self.assertEqual(sock._data_sent, REQUEST)
diff --git a/urllib3/sync_connection.py b/urllib3/sync_connection.py
index 95b6991b..425cf242 100644
--- a/urllib3/sync_connection.py
+++ b/urllib3/sync_connection.py
@@ -16,6 +16,7 @@ from __future__ import absolute_import
import collections
import datetime
+import errno
import itertools
import socket
import warnings
@@ -25,7 +26,8 @@ import h11
from .base import Request, Response
from .exceptions import (
ConnectTimeoutError, NewConnectionError, SubjectAltNameWarning,
- SystemTimeWarning, BadVersionError, FailedTunnelError, InvalidBodyError
+ SystemTimeWarning, BadVersionError, FailedTunnelError, InvalidBodyError,
+ ProtocolError
)
from .packages import six
from .util import selectors, connection, ssl_ as ssl_util
@@ -43,6 +45,9 @@ RECENT_DATE = datetime.date(2016, 1, 1)
_SUPPORTED_VERSIONS = frozenset([b'1.0', b'1.1'])
+# A sentinel object returned when some syscalls return EAGAIN.
+_EAGAIN = object()
+
def _headers_to_native_string(headers):
"""
@@ -141,24 +146,6 @@ def _body_bytes(request, state_machine):
yield state_machine.send(h11.EndOfMessage())
-def _maybe_read_response(data, state_machine):
- """
- Feeds some more data into the state machine and potentially returns a
- response object.
- """
- response = None
- event = None
- state_machine.receive_data(data)
-
- while event is not h11.NEED_DATA:
- event = state_machine.next_event()
- if isinstance(event, h11.Response):
- response = event
- break
-
- return response
-
-
def _response_from_h11(h11_response, body_object):
"""
Given a h11 Response object, build a urllib3 response object and return it.
@@ -195,6 +182,52 @@ def _build_tunnel_request(host, port, headers):
return tunnel_request
+def _wait_for_event(selector, sock, event, timeout):
+ """
+ Waits for a specific event on a socket for no more than the time in
+ timeout. Throws an exception if the timeout is exceeded.
+ """
+ old_events = selector.get_key(sock).events
+ try:
+ selector.modify(sock, event)
+ if not selector.select(timeout=timeout):
+ # TODO: Raise our own timeouts later
+ raise sock.timeout()
+ return
+ finally:
+ selector.modify(sock, old_events)
+
+
+def _recv_or_eagain(sock):
+ """
+ Calls recv on a non-blocking socket. Returns the number of bytes read or
+ the sentinel object _EAGAIN.
+ """
+ try:
+ return sock.recv(65536)
+ except ssl.SSLWantReadError:
+ return _EAGAIN
+ except (OSError, socket.error) as e:
+ if e.errno == errno.EAGAIN:
+ return _EAGAIN
+ raise
+
+
+def _write_or_eagain(sock, data):
+ """
+ Calls send on a non-blocking socket. Returns the number of bytes written or
+ the sentinel object _EAGAIN.
+ """
+ try:
+ return sock.send(data)
+ except ssl.SSLWantWriteError:
+ return _EAGAIN
+ except (OSError, socket.error) as e:
+ if e.errno == errno.EAGAIN:
+ return _EAGAIN
+ raise
+
+
_DEFAULT_SOCKET_OPTIONS = object()
@@ -280,7 +313,7 @@ class SyncHTTP1Connection(object):
return conn
- def _send_unless_readable(self, data):
+ def _send_unless_readable(self, state_machine, data):
"""
This method sends the data in ``data`` on the given socket. It will
abort early if the socket became readable for any reason.
@@ -288,6 +321,10 @@ class SyncHTTP1Connection(object):
If the socket became readable, this returns True. Otherwise, returns
False.
"""
+ # First, register the socket with the selector.
+ self._selector.modify(
+ self._sock, selectors.EVENT_READ | selectors.EVENT_WRITE
+ )
# We take a memoryview here because if the chunk is very large we're
# going to slice it a few times, and we'd like to avoid doing copies as
# we do that.
@@ -296,30 +333,151 @@ class SyncHTTP1Connection(object):
while chunk:
events = self._selector.select()[0][1] # TODO: timeout!
- # If the socket is readable, we stop uploading.
+ # The "happy path" here is that the socket has become marked
+ # writable. If that happens, we just call send. If this returns
+ # EAGAIN or SSL_WANT_WRITE, that's fine, we just spin around again.
+ #
+ # The less happy path here is that the socket has become marked
+ # *readable*. That is...problematic. It may be the case that there
+ # is data to receive from the remote peer. If there is, we want to
+ # stop uploading. However, in the TLS case this data may be
+ # triggering a TLS renegotiation, so the simple fact that the
+ # socket is readable is not a bug. So what we do is attempt to call
+ # recv. If it returns data, we shove it into our state machine and
+ # then break from the loop. If it returns EAGAIN, we assume that
+ # it was just TLS stuff and move on.
+ #
+ # Note that we only *actually* break from the loop if and when we
+ # get an actual final response header block. Prior to that point we
+ # will keep sending data. This allows 1XX header blocks to also be
+ # ignored.
if events & selectors.EVENT_READ:
+ data = _recv_or_eagain(self._sock)
+ if data is _EAGAIN:
+ continue
+
+ state_machine.receive_data(data)
return True
- assert events & selectors.EVENT_WRITE
- chunk_sent = self._sock.send(chunk)
- chunk = chunk[chunk_sent:]
+ if events & selectors.EVENT_WRITE:
+ # This `while` loop is present to prevent us doing too much
+ # selector polling. We already know the selector is writable:
+ # we don't need to ask again until a write actually succeeds or
+ # we get EAGAIN.
+ bytes_written = None
+ while bytes_written is None:
+ try:
+ bytes_written = _write_or_eagain(self._sock, chunk)
+ except ssl.SSLWantReadError:
+ # This is unlikely, but we should still tolerate it.
+ _wait_for_event(
+ self._selector,
+ self._sock,
+ selectors.EVENT_READ,
+ None # TODO: Timeout!
+ )
+ else:
+ if bytes_written is not _EAGAIN:
+ chunk = chunk[bytes_written:]
return False
- def _receive_bytes(self, read_timeout):
+ def send_request(self, request, read_timeout):
"""
- This method blocks until the socket is readable or the read times out
- (TODO), and then returns whatever data was read. Signals EOF the same
- way ``recv`` does: by returning the empty string.
+ Given a Request object, performs the logic required to get a response.
"""
- keys = self._selector.select(read_timeout)
- if not keys:
- # TODO: Raise our own timeouts later.
- raise socket.timeout()
- events = keys[0][1]
- assert events == selectors.EVENT_READ
- data = self._sock.recv(65536)
- return data
+ # Step 1: Send Request.
+ # TODO: Replace read_timeout with something smarter.
+ self._read_timeout = read_timeout
+
+ # Before we begin, confirm that the state machine is ok.
+ if (self._state_machine.our_state is not h11.IDLE or
+ self._state_machine.their_state is not h11.IDLE):
+ raise ProtocolError("Invalid internal state transition")
+
+ header_bytes = _request_to_bytes(request, self._state_machine)
+ body_chunks = _body_bytes(request, self._state_machine)
+ request_chunks = itertools.chain([header_bytes], body_chunks)
+ response = None
+
+ # First, register the socket with the selector.
+ self._selector.modify(
+ self._sock, selectors.EVENT_READ | selectors.EVENT_WRITE
+ )
+
+ # Next, send the body.
+ for chunk in request_chunks:
+ did_read = self._send_unless_readable(self._state_machine, chunk)
+ if did_read:
+ break
+
+ # Ok, we've sent the request. Now we want to read the response. This
+ # needs a different loop, slightly.
+ #
+ # While reading, we are again looping around in select(). By default,
+ # we do not look for writability, because for large responses to small
+ # requests the socket will inevitably be writable. Each time the
+ # selector marks the socket as readable, we will attempt to read. This
+ # may raise EAGAIN or WANT_READ, either of which causes us to just loop
+ # again. However it may *also* raise WANT_WRITE. If it does, we will
+ # block the event loop until the socket returns *writable*, and then
+ # loop back around again.
+ self._selector.modify(self._sock, selectors.EVENT_READ)
+ response = None
+ while not isinstance(response, h11.Response):
+ response = self._read_until_event(
+ self._state_machine, self._read_timeout
+ )
+
+ if response.http_version not in _SUPPORTED_VERSIONS:
+ raise BadVersionError(response.http_version)
+
+ return _response_from_h11(response, self)
+
+ def _read_until_event(self, state_machine, read_timeout):
+ """
+ A selector loop that spins over the selector and socket, issuing reads
+ and feeding the data into h11 and checking whether h11 has an event for
+ us. The moment there is an event other than h11.NEED_DATA, this
+ function returns that event.
+ """
+ # While reading, we are looping around in select(). By default, we do
+ # not look for writability, because for large responses to small
+ # requests the socket will inevitably be writable. Each time the
+ # selector marks the socket as readable, we will attempt to read. This
+ # may raise EAGAIN or WANT_READ, either of which causes us to just loop
+ # again. However, it may *also* raise WANT_WRITE. If it does, we will
+ # block the event loop until the socket returns *writable*, and then
+ # loop back around again.
+ event = state_machine.next_event()
+ self._selector.modify(self._sock, selectors.EVENT_READ)
+ while event is h11.NEED_DATA:
+ selector_events = self._selector.select(read_timeout)
+ if not selector_events:
+ # TODO: Raise our own timeouts later.
+ raise socket.timeout()
+
+ # This `while` loop is present to prevent us doing too much
+ # selector polling. We already know the selector is readable: we
+ # don't need to ask again until a read actually succeeds or we get
+ # EAGAIN.
+ read_bytes = None
+ while read_bytes is None:
+ try:
+ read_bytes = _recv_or_eagain(self._sock)
+ except ssl.SSLWantWriteError:
+ _wait_for_event(
+ self._selector,
+ self._sock,
+ selectors.EVENT_WRITE,
+ read_timeout
+ )
+ else:
+ if read_bytes is not _EAGAIN:
+ state_machine.receive_data(read_bytes)
+ event = state_machine.next_event()
+
+ return event
def _tunnel(self, conn):
"""
@@ -347,17 +505,18 @@ class SyncHTTP1Connection(object):
self._selector.register(
self._sock, selectors.EVENT_READ | selectors.EVENT_WRITE
)
- self._send_unless_readable(bytes_to_send)
+ self._send_unless_readable(tunnel_state_machine, bytes_to_send)
# At this point we no longer care if the socket is writable.
self._selector.modify(self._sock, selectors.EVENT_READ)
response = None
- while response is None:
- # TODO: Add a timeout here.
- # TODO: Error handling.
- read_bytes = self._receive_bytes(read_timeout=None)
- response = _maybe_read_response(read_bytes, tunnel_state_machine)
+ while not isinstance(response, h11.Response):
+ # TODO: add a timeout here
+ # TODO: Error handling
+ response = self._read_until_event(
+ tunnel_state_machine, read_timeout=None
+ )
if response.status_code != 200:
response = _response_from_h11(response, self)
@@ -433,47 +592,6 @@ class SyncHTTP1Connection(object):
self._sock, selectors.EVENT_READ | selectors.EVENT_WRITE
)
- def send_request(self, request, read_timeout):
- """
- Sends a single Request object. Returns a Response.
- """
- # TODO: Replace read_timeout with something smarter.
- self._read_timeout = read_timeout
-
- # Before we begin, confirm that the state machine is ok.
- assert self._state_machine.our_state is h11.IDLE
- assert self._state_machine.their_state is h11.IDLE
-
- # First, register the socket with the selector. We want to look for
- # readability *and* writability, because if the socket suddenly becomes
- # readable we need to stop our upload immediately.
- self._selector.modify(
- self._sock, selectors.EVENT_READ | selectors.EVENT_WRITE
- )
- header_bytes = _request_to_bytes(request, self._state_machine)
- body_chunks = _body_bytes(request, self._state_machine)
- request_chunks = itertools.chain([header_bytes], body_chunks)
-
- for chunk in request_chunks:
- # If the socket becomes readable we don't need to error out or
- # anything: we can just continue with our current logic.
- readable = self._send_unless_readable(chunk)
- if readable:
- break
-
- # At this point we no longer care if the socket is writable.
- self._selector.modify(self._sock, selectors.EVENT_READ)
-
- response = None
- while response is None:
- read_bytes = self._receive_bytes(read_timeout)
- response = _maybe_read_response(read_bytes, self._state_machine)
-
- if response.http_version not in _SUPPORTED_VERSIONS:
- raise BadVersionError(response.http_version)
-
- return _response_from_h11(response, self)
-
def close(self):
"""
Close this connection, suitable for being re-added to a connection
@@ -563,18 +681,14 @@ class SyncHTTP1Connection(object):
if self._state_machine is None:
raise StopIteration()
- data = None
-
- while data is None:
- event = self._state_machine.next_event()
- if event is h11.NEED_DATA:
- received_bytes = self._receive_bytes(self._read_timeout)
- self._state_machine.receive_data(received_bytes)
- elif isinstance(event, h11.Data):
- data = bytes(event.data)
- elif isinstance(event, h11.EndOfMessage):
- self._reset()
- raise StopIteration()
+ event = self._read_until_event(
+ self._state_machine, read_timeout=self._read_timeout
+ )
+ if isinstance(event, h11.Data):
+ data = bytes(event.data)
+ elif isinstance(event, h11.EndOfMessage):
+ self._reset()
+ raise StopIteration()
return data