From 61bf923c605e9cecdd92090e7babeae59f3e310f Mon Sep 17 00:00:00 2001 From: Cory Benfield Date: Wed, 3 May 2017 17:05:37 +0100 Subject: We currently don't support ST in v2 --- test/contrib/test_securetransport.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/contrib/test_securetransport.py b/test/contrib/test_securetransport.py index 96fc57ea..390872b9 100644 --- a/test/contrib/test_securetransport.py +++ b/test/contrib/test_securetransport.py @@ -7,6 +7,8 @@ try: except ImportError as e: raise SkipTest('Could not import SecureTransport: %r' % e) +raise SkipTest('SecureTransport currently not supported in v2!') + from ..with_dummyserver.test_https import TestHTTPS, TestHTTPS_TLSv1 # noqa: F401 from ..with_dummyserver.test_socketlevel import ( # noqa: F401 TestSNI, TestSocketClosing, TestClientCerts -- cgit v1.2.1 From beb1e2b92133f3c2af3637465a6ef270b009467a Mon Sep 17 00:00:00 2001 From: Cory Benfield Date: Wed, 3 May 2017 17:27:09 +0100 Subject: Refactoring to handle EAGAIN, WANT_READ, WANT_WRITE --- urllib3/sync_connection.py | 306 +++++++++++++++++++++++++++++++-------------- 1 file changed, 211 insertions(+), 95 deletions(-) diff --git a/urllib3/sync_connection.py b/urllib3/sync_connection.py index 95b6991b..8b1c652a 100644 --- a/urllib3/sync_connection.py +++ b/urllib3/sync_connection.py @@ -16,6 +16,7 @@ from __future__ import absolute_import import collections import datetime +import errno import itertools import socket import warnings @@ -25,7 +26,8 @@ import h11 from .base import Request, Response from .exceptions import ( ConnectTimeoutError, NewConnectionError, SubjectAltNameWarning, - SystemTimeWarning, BadVersionError, FailedTunnelError, InvalidBodyError + SystemTimeWarning, BadVersionError, FailedTunnelError, InvalidBodyError, + ProtocolError ) from .packages import six from .util import selectors, connection, ssl_ as ssl_util @@ -43,6 +45,9 @@ RECENT_DATE = datetime.date(2016, 1, 1) _SUPPORTED_VERSIONS = frozenset([b'1.0', b'1.1']) +# A sentinel object returned when some syscalls return EAGAIN. +_EAGAIN = object() + def _headers_to_native_string(headers): """ @@ -141,24 +146,6 @@ def _body_bytes(request, state_machine): yield state_machine.send(h11.EndOfMessage()) -def _maybe_read_response(data, state_machine): - """ - Feeds some more data into the state machine and potentially returns a - response object. - """ - response = None - event = None - state_machine.receive_data(data) - - while event is not h11.NEED_DATA: - event = state_machine.next_event() - if isinstance(event, h11.Response): - response = event - break - - return response - - def _response_from_h11(h11_response, body_object): """ Given a h11 Response object, build a urllib3 response object and return it. @@ -195,6 +182,64 @@ def _build_tunnel_request(host, port, headers): return tunnel_request +def _wait_for_event(selector, sock, event, timeout): + """ + Waits for a specific event on a socket for no more than the time in + timeout. Throws an exception if the timeout is exceeded. + """ + old_events = selector.get_key(sock).events + try: + selector.modify(sock, event) + if not selector.select(timeout=timeout): + # TODO: Raise our own timeouts later + raise sock.timeout() + return + finally: + selector.modify(sock, old_events) + + +def _recv_or_eagain(sock): + """ + Calls recv on a non-blocking socket. Returns the number of bytes read or + the sentinel object _EAGAIN. + """ + try: + return sock.recv(65536) + except (OSError, socket.error) as e: + errcode = None + if hasattr(e, "errno"): + errcode = e.errno + elif hasattr(e, "args"): + errcode = e.args[0] + + if errcode == errno.EAGAIN: + return _EAGAIN + raise + except ssl.SSLWantReadError: + return _EAGAIN + + +def _write_or_eagain(sock, data): + """ + Calls send on a non-blocking socket. Returns the number of bytes written or + the sentinel object _EAGAIN. + """ + try: + return sock.send(data) + except (OSError, socket.error) as e: + errcode = None + if hasattr(e, "errno"): + errcode = e.errno + elif hasattr(e, "args"): + errcode = e.args[0] + + if errcode == errno.EAGAIN: + return _EAGAIN + raise + except ssl.SSLWantWriteError: + return _EAGAIN + + _DEFAULT_SOCKET_OPTIONS = object() @@ -280,7 +325,7 @@ class SyncHTTP1Connection(object): return conn - def _send_unless_readable(self, data): + def _send_unless_readable(self, state_machine, data): """ This method sends the data in ``data`` on the given socket. It will abort early if the socket became readable for any reason. @@ -288,6 +333,10 @@ class SyncHTTP1Connection(object): If the socket became readable, this returns True. Otherwise, returns False. """ + # First, register the socket with the selector. + self._selector.modify( + self._sock, selectors.EVENT_READ | selectors.EVENT_WRITE + ) # We take a memoryview here because if the chunk is very large we're # going to slice it a few times, and we'd like to avoid doing copies as # we do that. @@ -296,30 +345,141 @@ class SyncHTTP1Connection(object): while chunk: events = self._selector.select()[0][1] # TODO: timeout! - # If the socket is readable, we stop uploading. + # The "happy path" here is that the socket has become marked + # writable. If that happens, we just call send. If this returns + # EAGAIN or SSL_WANT_WRITE, that's fine, we just spin around again. + # + # The less happy path here is that the socket has become marked + # *readable*. That is...problematic. It may be the case that there + # is data to receive from the remote peer. If there is, we want to + # stop uploading. However, in the TLS case this data may be + # triggering a TLS renegotiation, so the simple fact that the + # socket is readable is not a bug. So what we do is attempt to call + # recv. If it returns data, we shove it into our state machine and + # then break from the loop. If it returns EAGAIN, we assume that + # it was just TLS stuff and move on. + # + # Note that we only *actually* break from the loop if and when we + # get an actual final response header block. Prior to that point we + # will keep sending data. This allows 1XX header blocks to also be + # ignored. if events & selectors.EVENT_READ: + data = _recv_or_eagain(self._sock) + if data is _EAGAIN: + continue + + state_machine.receive_data(data) return True - assert events & selectors.EVENT_WRITE - chunk_sent = self._sock.send(chunk) - chunk = chunk[chunk_sent:] + if events & selectors.EVENT_WRITE: + try: + bytes_written = _write_or_eagain(self._sock, chunk) + except ssl.SSLWantReadError: + # This is unlikely, but we should still tolerate it. + _wait_for_event( + self._selector, + self._sock, + selectors.EVENT_READ, + None # TODO: Timeout! + ) + continue + + if bytes_written is not _EAGAIN: + chunk = chunk[bytes_written:] return False - def _receive_bytes(self, read_timeout): + def send_request(self, request, read_timeout): """ - This method blocks until the socket is readable or the read times out - (TODO), and then returns whatever data was read. Signals EOF the same - way ``recv`` does: by returning the empty string. + Just a stub to work out what the hell the sending loop should be. """ - keys = self._selector.select(read_timeout) - if not keys: - # TODO: Raise our own timeouts later. - raise socket.timeout() - events = keys[0][1] - assert events == selectors.EVENT_READ - data = self._sock.recv(65536) - return data + # Step 1: Send Request. + # TODO: Replace read_timeout with something smarter. + self._read_timeout = read_timeout + + # Before we begin, confirm that the state machine is ok. + if (self._state_machine.our_state is not h11.IDLE or + self._state_machine.their_state is not h11.IDLE): + raise ProtocolError("Invalid internal state transition") + + header_bytes = _request_to_bytes(request, self._state_machine) + body_chunks = _body_bytes(request, self._state_machine) + request_chunks = itertools.chain([header_bytes], body_chunks) + response = None + + # First, register the socket with the selector. + self._selector.modify( + self._sock, selectors.EVENT_READ | selectors.EVENT_WRITE + ) + + # Next, send the body. + for chunk in request_chunks: + did_read = self._send_unless_readable(self._state_machine, chunk) + if did_read: + break + + # Ok, we've sent the request. Now we want to read the response. This + # needs a different loop, slightly. + # + # While reading, we are again looping around in select(). By default, + # we do not look for writability, because for large responses to small + # requests the socket will inevitably be writable. Each time the + # selector marks the socket as readable, we will attempt to read. This + # may raise EAGAIN or WANT_READ, either of which causes us to just loop + # again. However it may *also* raise WANT_WRITE. If it does, we will + # block the event loop until the socket returns *writable*, and then + # loop back around again. + self._selector.modify(self._sock, selectors.EVENT_READ) + response = None + while not isinstance(response, h11.Response): + response = self._read_until_event( + self._state_machine, self._read_timeout + ) + + if response.http_version not in _SUPPORTED_VERSIONS: + raise BadVersionError(response.http_version) + + return _response_from_h11(response, self) + + def _read_until_event(self, state_machine, read_timeout): + """ + A selector loop that spins over the selector and socket, issuing reads + and feeding the data into h11 and checking whether h11 has an event for + us. The moment there is an event other than h11.NEED_DATA, this + function returns that event. + """ + # While reading, we are looping around in select(). By default, we do + # not look for writability, because for large responses to small + # requests the socket will inevitably be writable. Each time the + # selector marks the socket as readable, we will attempt to read. This + # may raise EAGAIN or WANT_READ, either of which causes us to just loop + # again. However, it may *also* raise WANT_WRITE. If it does, we will + # block the event loop until the socket returns *writable*, and then + # loop back around again. + event = state_machine.next_event() + self._selector.modify(self._sock, selectors.EVENT_READ) + while event is h11.NEED_DATA: + selector_events = self._selector.select(read_timeout) + if not selector_events: + # TODO: Raise our own timeouts later. + raise socket.timeout() + + try: + read_bytes = _recv_or_eagain(self._sock) + except ssl.SSLWantWriteError: + _wait_for_event( + self._selector, + self._sock, + selectors.EVENT_WRITE, + read_timeout + ) + continue + + if read_bytes is not _EAGAIN: + state_machine.receive_data(read_bytes) + event = state_machine.next_event() + + return event def _tunnel(self, conn): """ @@ -347,17 +507,18 @@ class SyncHTTP1Connection(object): self._selector.register( self._sock, selectors.EVENT_READ | selectors.EVENT_WRITE ) - self._send_unless_readable(bytes_to_send) + self._send_unless_readable(tunnel_state_machine, bytes_to_send) # At this point we no longer care if the socket is writable. self._selector.modify(self._sock, selectors.EVENT_READ) response = None - while response is None: - # TODO: Add a timeout here. - # TODO: Error handling. - read_bytes = self._receive_bytes(read_timeout=None) - response = _maybe_read_response(read_bytes, tunnel_state_machine) + while not isinstance(response, h11.Response): + # TODO: add a timeout here + # TODO: Error handling + response = self._read_until_event( + tunnel_state_machine, read_timeout=None + ) if response.status_code != 200: response = _response_from_h11(response, self) @@ -433,47 +594,6 @@ class SyncHTTP1Connection(object): self._sock, selectors.EVENT_READ | selectors.EVENT_WRITE ) - def send_request(self, request, read_timeout): - """ - Sends a single Request object. Returns a Response. - """ - # TODO: Replace read_timeout with something smarter. - self._read_timeout = read_timeout - - # Before we begin, confirm that the state machine is ok. - assert self._state_machine.our_state is h11.IDLE - assert self._state_machine.their_state is h11.IDLE - - # First, register the socket with the selector. We want to look for - # readability *and* writability, because if the socket suddenly becomes - # readable we need to stop our upload immediately. - self._selector.modify( - self._sock, selectors.EVENT_READ | selectors.EVENT_WRITE - ) - header_bytes = _request_to_bytes(request, self._state_machine) - body_chunks = _body_bytes(request, self._state_machine) - request_chunks = itertools.chain([header_bytes], body_chunks) - - for chunk in request_chunks: - # If the socket becomes readable we don't need to error out or - # anything: we can just continue with our current logic. - readable = self._send_unless_readable(chunk) - if readable: - break - - # At this point we no longer care if the socket is writable. - self._selector.modify(self._sock, selectors.EVENT_READ) - - response = None - while response is None: - read_bytes = self._receive_bytes(read_timeout) - response = _maybe_read_response(read_bytes, self._state_machine) - - if response.http_version not in _SUPPORTED_VERSIONS: - raise BadVersionError(response.http_version) - - return _response_from_h11(response, self) - def close(self): """ Close this connection, suitable for being re-added to a connection @@ -563,18 +683,14 @@ class SyncHTTP1Connection(object): if self._state_machine is None: raise StopIteration() - data = None - - while data is None: - event = self._state_machine.next_event() - if event is h11.NEED_DATA: - received_bytes = self._receive_bytes(self._read_timeout) - self._state_machine.receive_data(received_bytes) - elif isinstance(event, h11.Data): - data = bytes(event.data) - elif isinstance(event, h11.EndOfMessage): - self._reset() - raise StopIteration() + event = self._read_until_event( + self._state_machine, read_timeout=self._read_timeout + ) + if isinstance(event, h11.Data): + data = bytes(event.data) + elif isinstance(event, h11.EndOfMessage): + self._reset() + raise StopIteration() return data -- cgit v1.2.1 From 69c874384b9c04d1a2386dcca53bf644d7c7df8a Mon Sep 17 00:00:00 2001 From: Cory Benfield Date: Fri, 5 May 2017 14:53:08 +0100 Subject: The WANT* errors are instances of OSError! --- urllib3/sync_connection.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/urllib3/sync_connection.py b/urllib3/sync_connection.py index 8b1c652a..eba2a08c 100644 --- a/urllib3/sync_connection.py +++ b/urllib3/sync_connection.py @@ -205,6 +205,8 @@ def _recv_or_eagain(sock): """ try: return sock.recv(65536) + except ssl.SSLWantReadError: + return _EAGAIN except (OSError, socket.error) as e: errcode = None if hasattr(e, "errno"): @@ -215,8 +217,6 @@ def _recv_or_eagain(sock): if errcode == errno.EAGAIN: return _EAGAIN raise - except ssl.SSLWantReadError: - return _EAGAIN def _write_or_eagain(sock, data): @@ -226,6 +226,8 @@ def _write_or_eagain(sock, data): """ try: return sock.send(data) + except ssl.SSLWantWriteError: + return _EAGAIN except (OSError, socket.error) as e: errcode = None if hasattr(e, "errno"): @@ -236,8 +238,6 @@ def _write_or_eagain(sock, data): if errcode == errno.EAGAIN: return _EAGAIN raise - except ssl.SSLWantWriteError: - return _EAGAIN _DEFAULT_SOCKET_OPTIONS = object() -- cgit v1.2.1 From e39c8f0e69553f8cb65cbf66972977a7f30a0574 Mon Sep 17 00:00:00 2001 From: Cory Benfield Date: Fri, 5 May 2017 15:27:04 +0100 Subject: Avoid excessive selector polling --- urllib3/sync_connection.py | 66 ++++++++++++++++++++++++++-------------------- 1 file changed, 38 insertions(+), 28 deletions(-) diff --git a/urllib3/sync_connection.py b/urllib3/sync_connection.py index eba2a08c..7bf2f055 100644 --- a/urllib3/sync_connection.py +++ b/urllib3/sync_connection.py @@ -372,20 +372,25 @@ class SyncHTTP1Connection(object): return True if events & selectors.EVENT_WRITE: - try: - bytes_written = _write_or_eagain(self._sock, chunk) - except ssl.SSLWantReadError: - # This is unlikely, but we should still tolerate it. - _wait_for_event( - self._selector, - self._sock, - selectors.EVENT_READ, - None # TODO: Timeout! - ) - continue - - if bytes_written is not _EAGAIN: - chunk = chunk[bytes_written:] + # This `while` loop is present to prevent us doing too much + # selector polling. We already know the selector is writable: + # we don't need to ask again until a write actually succeeds or + # we get EAGAIN. + bytes_written = None + while bytes_written is None: + try: + bytes_written = _write_or_eagain(self._sock, chunk) + except ssl.SSLWantReadError: + # This is unlikely, but we should still tolerate it. + _wait_for_event( + self._selector, + self._sock, + selectors.EVENT_READ, + None # TODO: Timeout! + ) + else: + if bytes_written is not _EAGAIN: + chunk = chunk[bytes_written:] return False @@ -464,20 +469,25 @@ class SyncHTTP1Connection(object): # TODO: Raise our own timeouts later. raise socket.timeout() - try: - read_bytes = _recv_or_eagain(self._sock) - except ssl.SSLWantWriteError: - _wait_for_event( - self._selector, - self._sock, - selectors.EVENT_WRITE, - read_timeout - ) - continue - - if read_bytes is not _EAGAIN: - state_machine.receive_data(read_bytes) - event = state_machine.next_event() + # This `while` loop is present to prevent us doing too much + # selector polling. We already know the selector is readable: we + # don't need to ask again until a read actually succeeds or we get + # EAGAIN. + read_bytes = None + while read_bytes is None: + try: + read_bytes = _recv_or_eagain(self._sock) + except ssl.SSLWantWriteError: + _wait_for_event( + self._selector, + self._sock, + selectors.EVENT_WRITE, + read_timeout + ) + else: + if read_bytes is not _EAGAIN: + state_machine.receive_data(read_bytes) + event = state_machine.next_event() return event -- cgit v1.2.1 From e213eadca6cdb5c50f287215767760121296a65c Mon Sep 17 00:00:00 2001 From: Cory Benfield Date: Fri, 5 May 2017 15:39:22 +0100 Subject: Seems not to be needed even for Py2 --- urllib3/sync_connection.py | 16 ++-------------- 1 file changed, 2 insertions(+), 14 deletions(-) diff --git a/urllib3/sync_connection.py b/urllib3/sync_connection.py index 7bf2f055..93ea72bc 100644 --- a/urllib3/sync_connection.py +++ b/urllib3/sync_connection.py @@ -208,13 +208,7 @@ def _recv_or_eagain(sock): except ssl.SSLWantReadError: return _EAGAIN except (OSError, socket.error) as e: - errcode = None - if hasattr(e, "errno"): - errcode = e.errno - elif hasattr(e, "args"): - errcode = e.args[0] - - if errcode == errno.EAGAIN: + if e.errno == errno.EAGAIN: return _EAGAIN raise @@ -229,13 +223,7 @@ def _write_or_eagain(sock, data): except ssl.SSLWantWriteError: return _EAGAIN except (OSError, socket.error) as e: - errcode = None - if hasattr(e, "errno"): - errcode = e.errno - elif hasattr(e, "args"): - errcode = e.args[0] - - if errcode == errno.EAGAIN: + if e.errno == errno.EAGAIN: return _EAGAIN raise -- cgit v1.2.1 From f2ca0a8d3c128f66c4ca7360ed4b0050f3a230ae Mon Sep 17 00:00:00 2001 From: Cory Benfield Date: Sun, 7 May 2017 17:55:48 +0100 Subject: Better docstring --- urllib3/sync_connection.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/urllib3/sync_connection.py b/urllib3/sync_connection.py index 93ea72bc..425cf242 100644 --- a/urllib3/sync_connection.py +++ b/urllib3/sync_connection.py @@ -384,7 +384,7 @@ class SyncHTTP1Connection(object): def send_request(self, request, read_timeout): """ - Just a stub to work out what the hell the sending loop should be. + Given a Request object, performs the logic required to get a response. """ # Step 1: Send Request. # TODO: Replace read_timeout with something smarter. -- cgit v1.2.1 From da8368a3f2d53fcec136806349e2631dc2c363c7 Mon Sep 17 00:00:00 2001 From: Cory Benfield Date: Mon, 8 May 2017 09:48:23 +0100 Subject: Add low-level tests for socket behaviour --- test/test_sync_connection.py | 434 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 434 insertions(+) create mode 100644 test/test_sync_connection.py diff --git a/test/test_sync_connection.py b/test/test_sync_connection.py new file mode 100644 index 00000000..b5680a74 --- /dev/null +++ b/test/test_sync_connection.py @@ -0,0 +1,434 @@ +""" +Low-level synchronous connection tests. + +These tests involve mocking out the network layer to cause specific unusual +behaviours to occur. The goal is to ensure that the synchronous connection +layer can handle unexpected network weather without falling over, and without +expending undue effort to arrange that these effects actually happen on a real +network. +""" +import collections +import errno +import socket +import ssl +import unittest + +import h11 + +from urllib3.base import Request +from urllib3.sync_connection import SyncHTTP1Connection +from urllib3.util import selectors + + +# Objects and globals for handling scenarios. +Event = collections.namedtuple('Event', ['expected_object', 'event', 'meta']) + +SELECTOR = "SELECTOR" +SOCKET = "SOCKET" +RAISE_EAGAIN = "RAISE_EAGAIN" +RAISE_WANT_READ = "RAISE_WANT_READ" +RAISE_WANT_WRITE = "RAISE_WANT_WRITE" + +EVENT_SELECT = "EVENT_SELECT" + +EVENT_SEND = "EVENT_SEND" +SEND_ALL = "SEND_ALL" + +EVENT_RECV = "EVENT_RECV" +RECV_ALL = "RECV_ALL" + + +# A number of helpful shorthands for common events. +SELECT_UPLOAD_WRITE = Event( + SELECTOR, + EVENT_SELECT, + (selectors.EVENT_READ | selectors.EVENT_WRITE, selectors.EVENT_WRITE) +) +SELECT_UPLOAD_READ = Event( + SELECTOR, + EVENT_SELECT, + (selectors.EVENT_READ | selectors.EVENT_WRITE, selectors.EVENT_READ) +) +SELECT_DOWNLOAD_READ = Event( + SELECTOR, EVENT_SELECT, (selectors.EVENT_READ, selectors.EVENT_READ) +) +SELECT_DOWNLOAD_WRITE = Event( + SELECTOR, EVENT_SELECT, (selectors.EVENT_READ, selectors.EVENT_READ) +) +SELECT_WRITABLE_WRITE = Event( + SELECTOR, EVENT_SELECT, (selectors.EVENT_WRITE, selectors.EVENT_WRITE) +) +SOCKET_SEND_ALL = Event(SOCKET, EVENT_SEND, (SEND_ALL,)) +SOCKET_SEND_5 = Event(SOCKET, EVENT_SEND, (5,)) +SOCKET_SEND_EAGAIN = Event(SOCKET, EVENT_SEND, (RAISE_EAGAIN,)) +SOCKET_SEND_WANTREAD = Event(SOCKET, EVENT_SEND, (RAISE_WANT_READ,)) +SOCKET_SEND_WANTWRITE = Event(SOCKET, EVENT_SEND, (RAISE_WANT_WRITE,)) +SOCKET_RECV_ALL = Event(SOCKET, EVENT_RECV, (RECV_ALL,)) +SOCKET_RECV_5 = Event(SOCKET, EVENT_RECV, (5,)) +SOCKET_RECV_EAGAIN = Event(SOCKET, EVENT_RECV, (RAISE_EAGAIN,)) +SOCKET_RECV_WANTREAD = Event(SOCKET, EVENT_RECV, (RAISE_WANT_READ,)) +SOCKET_RECV_WANTWRITE = Event(SOCKET, EVENT_RECV, (RAISE_WANT_WRITE,)) + + +REQUEST = ( + b'GET / HTTP/1.1\r\n' + b'host: localhost\r\n' + b'\r\n' +) +RESPONSE = ( + b'HTTP/1.1 200 OK\r\n' + b'Server: totallyarealserver/1.0.0\r\n' + b'Content-Length: 8\r\n' + b'Content-Type: text/plain\r\n' + b'\r\n' + b'complete' +) + + +class ScenarioError(Exception): + """ + An error occurred with running the scenario. + """ + pass + + +class ScenarioSelector(object): + """ + A mock Selector object. This selector implements a tiny bit of the selector + API (only that which is used by the higher layers), and response to select + based on the scenario it is provided. + """ + def __init__(self, scenario, sock): + self._scenario = scenario + self._fd = sock + self._events = None + + def register(self, fd, events): + if fd is not self._fd: + raise ScenarioError("Registered unexpected socket!") + self._events = events + + def modify(self, fd, events): + if fd is not self._fd: + raise ScenarioError("Modifying unexpected socket!") + self._events = events + + def select(self, timeout=None): + expected_object, event, args = self._scenario.pop(0) + if expected_object is not SELECTOR: + raise ScenarioError("Received non selector event!") + + if event is not EVENT_SELECT: + raise ScenarioError("Expected EVENT_SELECT, got %s" % event) + + expected_events, returned_event = args + if self._events != expected_events: + raise ScenarioError( + "Expected events %s, got %s" % (self._events, expected_events) + ) + + key = self.get_key(self._fd) + return [(key, returned_event)] + + def get_key(self, fd): + if fd is not self._fd: + raise ScenarioError("Querying unexpected socket!") + return selectors.SelectorKey( + self._fd, + 1, + self._events, + None + ) + + def close(self): + pass + + +class ScenarioSocket(object): + """ + A mock Socket object. This object implements a tiny bit of the socket API + (only that which is used by the synchronous connection), and responds to + socket calls based on the scenario it is provided. + """ + def __init__(self, scenario): + self._scenario = scenario + self._data_to_send = RESPONSE + self._data_sent = b'' + self._closed = False + + def _raise_errors(self, possible_error): + if possible_error is RAISE_EAGAIN: + raise socket.error(errno.EAGAIN, "try again later") + elif possible_error is RAISE_WANT_READ: + raise ssl.SSLWantReadError("Want read") + elif possible_error is RAISE_WANT_WRITE: + raise ssl.SSLWantWriteError("Want write") + + def send(self, data): + expected_object, event, args = self._scenario.pop(0) + if expected_object is not SOCKET: + raise ScenarioError("Received non selector event!") + + if event is not EVENT_SEND: + raise ScenarioError("Expected EVENT_SEND, got %s" % event) + + amount, = args + self._raise_errors(amount) + if amount is SEND_ALL: + amount = len(data) + + self._data_sent += data[:amount].tobytes() + return amount + + def recv(self, amt): + expected_object, event, args = self._scenario.pop(0) + if expected_object is not SOCKET: + raise ScenarioError("Received non selector event!") + + if event is not EVENT_RECV: + raise ScenarioError("Expected EVENT_RECV, got %s" % event) + + amount, = args + self._raise_errors(amount) + if amount is RECV_ALL: + amount = min(len(RESPONSE), amt) + + rdata = self._data_to_send[:amount] + self._data_to_send = self._data_to_send[amount:] + return rdata + + def setblocking(self, *args): + pass + + def close(self): + self._closed = True + + +class TestUnusualSocketConditions(unittest.TestCase): + """ + This class contains tests that take strict control over sockets and + selectors. The goal here is to simulate unusual network conditions that are + extremely difficult to reproducibly simulate even with socketlevel tests in + which we control both ends of the connection. For example, these tests + will trigger WANT_READ and WANT_WRITE errors in TLS stacks which are + otherwise extremely hard to trigger, and will also fire EAGAIN on sockets + marked readable/writable, which can technically happen but are extremely + tricky to trigger by using actual sockets and the loopback interface. + + These tests are necessarily not a perfect replacement for actual realworld + examples, but those are so prohibitively difficult to trigger that these + will have to do instead. + """ + # A stub value of the read timeout that will be used by the selector. + # This should not be edited by tests: only used as a reference for what + # delay values they can use to force things to time out. + READ_TIMEOUT = 5 + + def run_scenario(self, scenario): + conn = SyncHTTP1Connection('localhost', 80) + conn._state_machine = h11.Connection(our_role=h11.CLIENT) + conn._sock = sock = ScenarioSocket(scenario) + conn._selector = ScenarioSelector(scenario, sock) + + request = Request(method=b'GET', target=b'/') + request.add_host(host=b'localhost', port=80, scheme='http') + response = conn.send_request(request, read_timeout=self.READ_TIMEOUT) + body = b''.join(response.body) + + # The scenario should be totally consumed. + self.assertFalse(scenario) + + # Validate that the response is complete. + self.assertEqual(response.status_code, 200) + self.assertEqual(body, b'complete') + self.assertEqual(response.version, b'HTTP/1.1') + self.assertEqual(len(response.headers), 3) + self.assertEqual(response.headers['server'], 'totallyarealserver/1.0.0') + self.assertEqual(response.headers['content-length'], '8') + self.assertEqual(response.headers['content-type'], 'text/plain') + + return sock + + def test_happy_path(self): + """ + When everything goes smoothly, the response is cleanly consumed. + """ + scenario = [ + SELECT_UPLOAD_WRITE, + SOCKET_SEND_ALL, + SELECT_DOWNLOAD_READ, + SOCKET_RECV_ALL, + ] + sock = self.run_scenario(scenario) + self.assertEqual(sock._data_sent, REQUEST) + + def test_handle_recv_eagain_download(self): + """ + When a socket is marked readable during response body download but + returns EAGAIN when read from, the code simply retries the read. + """ + scenario = [ + SELECT_UPLOAD_WRITE, + SOCKET_SEND_ALL, + SELECT_DOWNLOAD_READ, + SOCKET_RECV_EAGAIN, + SELECT_DOWNLOAD_READ, + SOCKET_RECV_EAGAIN, + SELECT_DOWNLOAD_READ, + SOCKET_RECV_ALL, + ] + sock = self.run_scenario(scenario) + self.assertEqual(sock._data_sent, REQUEST) + + def test_handle_recv_want_read_download(self): + """ + When a socket is marked readable during response body download but + returns SSL_WANT_READ when read from, the code simply retries the read. + """ + scenario = [ + SELECT_UPLOAD_WRITE, + SOCKET_SEND_ALL, + SELECT_DOWNLOAD_READ, + SOCKET_RECV_WANTREAD, + SELECT_DOWNLOAD_READ, + SOCKET_RECV_WANTREAD, + SELECT_DOWNLOAD_READ, + SOCKET_RECV_ALL, + ] + sock = self.run_scenario(scenario) + self.assertEqual(sock._data_sent, REQUEST) + + def test_handle_recv_eagain_upload(self): + """ + When a socket is marked readable during request upload but returns + EAGAIN when read from, the code ignores it and continues with upload. + """ + scenario = [ + SELECT_UPLOAD_WRITE, + SOCKET_SEND_5, + SELECT_UPLOAD_READ, + SOCKET_RECV_EAGAIN, + SELECT_UPLOAD_WRITE, + SOCKET_SEND_ALL, + SELECT_DOWNLOAD_READ, + SOCKET_RECV_ALL, + ] + sock = self.run_scenario(scenario) + self.assertEqual(sock._data_sent, REQUEST) + + def test_handle_recv_wantread_upload(self): + """ + When a socket is marked readable during request upload but returns + WANT_READ when read from, the code ignores it and continues with upload. + """ + scenario = [ + SELECT_UPLOAD_WRITE, + SOCKET_SEND_5, + SELECT_UPLOAD_READ, + SOCKET_RECV_WANTREAD, + SELECT_UPLOAD_WRITE, + SOCKET_SEND_ALL, + SELECT_DOWNLOAD_READ, + SOCKET_RECV_ALL, + ] + sock = self.run_scenario(scenario) + self.assertEqual(sock._data_sent, REQUEST) + + def test_handle_send_eagain_upload(self): + """ + When a socket is marked writable during request upload but returns + EAGAIN when written to, the code ignores it and continues with upload. + """ + scenario = [ + SELECT_UPLOAD_WRITE, + SOCKET_SEND_5, + SELECT_UPLOAD_WRITE, + SOCKET_SEND_EAGAIN, + SELECT_UPLOAD_WRITE, + SOCKET_SEND_ALL, + SELECT_DOWNLOAD_READ, + SOCKET_RECV_ALL, + ] + sock = self.run_scenario(scenario) + self.assertEqual(sock._data_sent, REQUEST) + + def test_handle_send_wantwrite_upload(self): + """ + When a socket is marked writable during request upload but returns + WANT_WRITE when written to, the code ignores it and continues with + upload. + """ + scenario = [ + SELECT_UPLOAD_WRITE, + SOCKET_SEND_5, + SELECT_UPLOAD_WRITE, + SOCKET_SEND_WANTWRITE, + SELECT_UPLOAD_WRITE, + SOCKET_SEND_ALL, + SELECT_DOWNLOAD_READ, + SOCKET_RECV_ALL, + ] + sock = self.run_scenario(scenario) + self.assertEqual(sock._data_sent, REQUEST) + + def test_handle_early_response(self): + """ + When a socket is marked readable during request upload, and any data is + read from the socket, the upload immediately stops and the response is + read. + """ + scenario = [ + SELECT_UPLOAD_WRITE, + SOCKET_SEND_5, + SELECT_UPLOAD_READ, + SOCKET_RECV_5, + SELECT_DOWNLOAD_READ, + SOCKET_RECV_ALL, + ] + sock = self.run_scenario(scenario) + self.assertEqual(sock._data_sent, REQUEST[:5]) + self.assertTrue(sock._closed) + + def test_handle_want_read_during_upload(self): + """ + When a socket is marked writable during request upload but returns + WANT_READ when written to, the code waits for the socket to become + readable and issues the write again. + """ + scenario = [ + SELECT_UPLOAD_WRITE, + SOCKET_SEND_5, + # Return WANT_READ twice for good measure. + SELECT_UPLOAD_WRITE, + SOCKET_SEND_WANTREAD, + SELECT_DOWNLOAD_READ, + SOCKET_SEND_WANTREAD, + SELECT_DOWNLOAD_READ, + SOCKET_SEND_ALL, + SELECT_DOWNLOAD_READ, + SOCKET_RECV_ALL, + ] + sock = self.run_scenario(scenario) + self.assertEqual(sock._data_sent, REQUEST) + + def test_handle_want_write_during_download(self): + """ + When a socket is marked readable during response download but returns + WANT_WRITE when read from, the code waits for the socket to become + writable and issues the read again. + """ + scenario = [ + SELECT_UPLOAD_WRITE, + SOCKET_SEND_ALL, + # Return WANT_WRITE twice for good measure. + SELECT_DOWNLOAD_READ, + SOCKET_RECV_WANTWRITE, + SELECT_WRITABLE_WRITE, + SOCKET_RECV_WANTWRITE, + SELECT_WRITABLE_WRITE, + SOCKET_RECV_5, + SELECT_DOWNLOAD_READ, + SOCKET_RECV_ALL, + ] + sock = self.run_scenario(scenario) + self.assertEqual(sock._data_sent, REQUEST) -- cgit v1.2.1