diff options
| author | Gabriel Falcão <gabriel@nacaolivre.org> | 2018-11-04 23:34:43 +0100 |
|---|---|---|
| committer | Gabriel Falcão <gabriel@nacaolivre.org> | 2018-11-04 23:34:43 +0100 |
| commit | 067c4bda72a442e1771374f9d69a5a1c77c88de5 (patch) | |
| tree | 907b5303ea0c77fbc64daa67e79920577c8edc2f | |
| parent | 2314893ec5ab46513e91ab867d7b55a72968909e (diff) | |
| download | httpretty-black.tar.gz | |
prettify code with blackblack
| -rw-r--r-- | docs/source/conf.py | 70 | ||||
| -rw-r--r-- | httpretty/compat.py | 40 | ||||
| -rw-r--r-- | httpretty/core.py | 606 | ||||
| -rw-r--r-- | httpretty/errors.py | 4 | ||||
| -rw-r--r-- | httpretty/http.py | 22 | ||||
| -rw-r--r-- | httpretty/utils.py | 6 | ||||
| -rw-r--r-- | httpretty/version.py | 2 | ||||
| -rw-r--r-- | setup.py | 54 | ||||
| -rw-r--r-- | tests/functional/__init__.py | 3 | ||||
| -rw-r--r-- | tests/functional/base.py | 30 | ||||
| -rw-r--r-- | tests/functional/test_bypass.py | 68 | ||||
| -rw-r--r-- | tests/functional/test_debug.py | 24 | ||||
| -rw-r--r-- | tests/functional/test_decorator.py | 62 | ||||
| -rw-r--r-- | tests/functional/test_fakesocket.py | 23 | ||||
| -rw-r--r-- | tests/functional/test_httplib2.py | 309 | ||||
| -rw-r--r-- | tests/functional/test_passthrough.py | 16 | ||||
| -rw-r--r-- | tests/functional/test_requests.py | 748 | ||||
| -rw-r--r-- | tests/functional/test_urllib2.py | 258 | ||||
| -rw-r--r-- | tests/functional/testserver.py | 20 | ||||
| -rw-r--r-- | tests/pyopenssl/__init__.py | 3 | ||||
| -rw-r--r-- | tests/pyopenssl/test_mock.py | 15 | ||||
| -rw-r--r-- | tests/unit/test_core.py | 470 | ||||
| -rw-r--r-- | tests/unit/test_httpretty.py | 320 | ||||
| -rw-r--r-- | tests/unit/test_main.py | 10 |
24 files changed, 1667 insertions, 1516 deletions
diff --git a/docs/source/conf.py b/docs/source/conf.py index 0dea7ef..b345c9b 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -2,21 +2,22 @@ import sys import sphinx_rtd_theme + try: from pathlib2 import Path except ImportError: from pathlib import Path -project_path = Path(__file__).absolute().parent.joinpath('../..') +project_path = Path(__file__).absolute().parent.joinpath("../..") sys.path.insert(0, project_path.as_posix()) -from httpretty.version import version # noqa +from httpretty.version import version # noqa -project = 'HTTPretty' -copyright = '2018, Gabriel Falcao' -author = 'Gabriel Falcao' +project = "HTTPretty" +copyright = "2018, Gabriel Falcao" +author = "Gabriel Falcao" # The short X.Y version version = version @@ -25,47 +26,49 @@ release = version extensions = [ - 'sphinx.ext.autodoc', - 'sphinx.ext.doctest', - 'sphinx.ext.intersphinx', - 'sphinx.ext.coverage', - 'sphinx.ext.ifconfig', - 'sphinx.ext.viewcode', - 'sphinx.ext.githubpages', - 'sphinx.ext.autosummary', + "sphinx.ext.autodoc", + "sphinx.ext.doctest", + "sphinx.ext.intersphinx", + "sphinx.ext.coverage", + "sphinx.ext.ifconfig", + "sphinx.ext.viewcode", + "sphinx.ext.githubpages", + "sphinx.ext.autosummary", ] -templates_path = ['_templates'] +templates_path = ["_templates"] -source_suffix = '.rst' -master_doc = 'index' +source_suffix = ".rst" +master_doc = "index" language = None exclude_patterns = [] -pygments_style = 'friendly' +pygments_style = "friendly" html_theme = "sphinx_rtd_theme" html_theme_path = [sphinx_rtd_theme.get_html_theme_path()] # html_theme_options = {} -html_static_path = ['_static'] +html_static_path = ["_static"] -htmlhelp_basename = 'HTTPrettydoc' +htmlhelp_basename = "HTTPrettydoc" latex_elements = {} latex_documents = [ - (master_doc, 'HTTPretty.tex', 'HTTPretty Documentation', - 'Gabriel Falcao', 'manual'), + (master_doc, "HTTPretty.tex", "HTTPretty Documentation", "Gabriel Falcao", "manual") ] -man_pages = [ - (master_doc, 'httpretty', 'HTTPretty Documentation', - [author], 1) -] +man_pages = [(master_doc, "httpretty", "HTTPretty Documentation", [author], 1)] texinfo_documents = [ - (master_doc, 'HTTPretty', 'HTTPretty Documentation', - author, 'HTTPretty', 'One line description of project.', - 'Miscellaneous'), + ( + master_doc, + "HTTPretty", + "HTTPretty Documentation", + author, + "HTTPretty", + "One line description of project.", + "Miscellaneous", + ) ] @@ -74,11 +77,10 @@ epub_author = author epub_publisher = author epub_copyright = copyright -epub_exclude_files = ['search.html'] +epub_exclude_files = ["search.html"] intersphinx_mapping = { - 'python': ('https://docs.python.org/3', None), - - 'httplib2': ('https://httplib2.readthedocs.io/en/latest/', None), - 'requests': ('http://docs.python-requests.org/en/master/', None), - 'urllib3': ('https://urllib3.readthedocs.io/en/latest/', None), + "python": ("https://docs.python.org/3", None), + "httplib2": ("https://httplib2.readthedocs.io/en/latest/", None), + "requests": ("http://docs.python-requests.org/en/master/", None), + "urllib3": ("https://urllib3.readthedocs.io/en/latest/", None), } diff --git a/httpretty/compat.py b/httpretty/compat.py index 969662c..a180593 100644 --- a/httpretty/compat.py +++ b/httpretty/compat.py @@ -37,18 +37,18 @@ if PY3: # pragma: no cover else: # pragma: no cover import StringIO + StringIO = StringIO.StringIO basestring = string_types class BaseClass(object): - def __repr__(self): ret = self.__str__() if PY3: # pragma: no cover return ret else: - return ret.encode('utf-8') + return ret.encode("utf-8") try: # pragma: no cover @@ -59,25 +59,27 @@ try: # pragma: no cover from urllib.parse import quote_plus from urllib.parse import unquote from urllib.parse import urlencode + unquote_utf8 = unquote def encode_obj(in_obj): return in_obj + + except ImportError: # pragma: no cover from urlparse import urlsplit, urlunsplit, parse_qs, unquote from urllib import quote, quote_plus, urlencode def unquote_utf8(qs): if isinstance(qs, text_type): - qs = qs.encode('utf-8') + qs = qs.encode("utf-8") s = unquote(qs) if isinstance(s, binary_type): - return s.decode('utf-8', errors='ignore') + return s.decode("utf-8", errors="ignore") else: return s def encode_obj(in_obj): - def encode_list(in_list): out_list = [] for el in in_list: @@ -91,7 +93,7 @@ except ImportError: # pragma: no cover return out_dict if isinstance(in_obj, unicode): - return in_obj.encode('utf-8') + return in_obj.encode("utf-8") elif isinstance(in_obj, list): return encode_list(in_obj) elif isinstance(in_obj, tuple): @@ -114,17 +116,17 @@ if not PY3: # pragma: no cover __all__ = [ - 'PY3', - 'StringIO', - 'text_type', - 'binary_type', - 'BaseClass', - 'BaseHTTPRequestHandler', - 'quote', - 'quote_plus', - 'urlencode', - 'urlunsplit', - 'urlsplit', - 'parse_qs', - 'ClassTypes', + "PY3", + "StringIO", + "text_type", + "binary_type", + "BaseClass", + "BaseHTTPRequestHandler", + "quote", + "quote_plus", + "urlencode", + "urlunsplit", + "urlsplit", + "parse_qs", + "ClassTypes", ] diff --git a/httpretty/core.py b/httpretty/core.py index c847baa..bdd411e 100644 --- a/httpretty/core.py +++ b/httpretty/core.py @@ -57,19 +57,11 @@ from .compat import ( parse_qs, unquote_utf8, ClassTypes, - basestring -) -from .http import ( - STATUSES, - HttpBaseClass, - parse_requestline, - last_requestline, + basestring, ) +from .http import STATUSES, HttpBaseClass, parse_requestline, last_requestline -from .utils import ( - utf8, - decode_utf8, -) +from .utils import utf8, decode_utf8 from .errors import HTTPrettyError, UnmockedError @@ -89,18 +81,20 @@ old_sslwrap_simple = None old_sslsocket = None old_sslcontext_wrap_socket = None -MULTILINE_ANY_REGEX = re.compile(r'.*', re.M) -hostname_re = re.compile(r'\^?(?:https?://)?[^:/]*[:/]?') +MULTILINE_ANY_REGEX = re.compile(r".*", re.M) +hostname_re = re.compile(r"\^?(?:https?://)?[^:/]*[:/]?") try: # pragma: no cover import socks + old_socksocket = socks.socksocket except ImportError: socks = None try: # pragma: no cover import ssl + old_ssl_wrap_socket = ssl.wrap_socket try: old_sslcontext_wrap_socket = ssl.SSLContext.wrap_socket @@ -115,6 +109,7 @@ except ImportError: # pragma: no cover try: import requests.packages.urllib3.connection as requests_urllib3_connection + old_requests_ssl_wrap_socket = requests_urllib3_connection.ssl_wrap_socket except ImportError: requests_urllib3_connection = None @@ -163,7 +158,8 @@ class HTTPrettyRequest(BaseHTTPRequestHandler, BaseClass): """ - def __init__(self, headers, body=''): + + def __init__(self, headers, body=""): # first of all, lets make sure that if headers or body are # unicode strings, it must be converted into a utf-8 encoded # byte string @@ -172,7 +168,7 @@ class HTTPrettyRequest(BaseHTTPRequestHandler, BaseClass): # Now let's concatenate the headers with the body, and create # `rfile` based on it - self.rfile = StringIO(b'\r\n\r\n'.join([self.raw_headers, self.body])) + self.rfile = StringIO(b"\r\n\r\n".join([self.raw_headers, self.body])) # Creating `wfile` as an empty StringIO, just to avoid any # real I/O calls @@ -196,7 +192,7 @@ class HTTPrettyRequest(BaseHTTPRequestHandler, BaseClass): # `querystring` holds a dictionary with the parsed query string try: - self.path = self.path.encode('iso-8859-1') + self.path = self.path.encode("iso-8859-1") except UnicodeDecodeError: pass @@ -233,9 +229,7 @@ class HTTPrettyRequest(BaseHTTPRequestHandler, BaseClass): def __str__(self): tmpl = '<HTTPrettyRequest("{}", total_headers={}, body_length={})>' return tmpl.format( - self.headers.get('content-type', ''), - len(self.headers), - len(self.body), + self.headers.get("content-type", ""), len(self.headers), len(self.body) ) def parse_querystring(self, qs): @@ -262,12 +256,12 @@ class HTTPrettyRequest(BaseHTTPRequestHandler, BaseClass): """ PARSING_FUNCTIONS = { - 'application/json': json.loads, - 'text/json': json.loads, - 'application/x-www-form-urlencoded': self.parse_querystring, + "application/json": json.loads, + "text/json": json.loads, + "application/x-www-form-urlencoded": self.parse_querystring, } - content_type = self.headers.get('content-type', '') + content_type = self.headers.get("content-type", "") do_parse = PARSING_FUNCTIONS.get(content_type, FALLBACK_FUNCTION) try: @@ -290,7 +284,7 @@ class HTTPrettyRequestEmpty(object): method = None url = None - body = '' + body = "" headers = EmptyRequestHeaders() @@ -299,12 +293,13 @@ class FakeSockFile(object): a temporary file, giving it a real file descriptor number. """ + def __init__(self): self.file = tempfile.TemporaryFile() self._fileno = self.file.fileno() def getvalue(self): - if hasattr(self.file, 'getvalue'): + if hasattr(self.file, "getvalue"): return self.file.getvalue() else: return self.file.read() @@ -323,6 +318,7 @@ class FakeSockFile(object): class FakeSSLSocket(object): """Shorthand for :py:class:`~httpretty.core.fakesock` """ + def __init__(self, sock, *args, **kw): self._httpretty_sock = sock @@ -334,18 +330,23 @@ class fakesock(object): """ fake :py:mod:`socket` """ + class socket(object): """drop-in replacement for :py:class:`socket.socket` """ + _entry = None debuglevel = 0 _sent_data = [] - def __init__(self, family=socket.AF_INET, type=socket.SOCK_STREAM, - protocol=0, _sock=None): - self.truesock = (old_socket(family, type, protocol) - if httpretty.allow_net_connect - else None) + def __init__( + self, family=socket.AF_INET, type=socket.SOCK_STREAM, protocol=0, _sock=None + ): + self.truesock = ( + old_socket(family, type, protocol) + if httpretty.allow_net_connect + else None + ) self._connected_truesock = False self._closed = True self.fd = FakeSockFile() @@ -359,23 +360,16 @@ class fakesock(object): now = datetime.now() shift = now + timedelta(days=30 * 12) return { - 'notAfter': shift.strftime('%b %d %H:%M:%S GMT'), - 'subjectAltName': ( - ('DNS', '*.%s' % self._host), - ('DNS', self._host), - ('DNS', '*'), + "notAfter": shift.strftime("%b %d %H:%M:%S GMT"), + "subjectAltName": ( + ("DNS", "*.%s" % self._host), + ("DNS", self._host), + ("DNS", "*"), ), - 'subject': ( - ( - ('organizationName', '*.%s' % self._host), - ), - ( - ('organizationalUnitName', - 'Domain Control Validated'), - ), - ( - ('commonName', '*.%s' % self._host), - ), + "subject": ( + (("organizationName", "*.%s" % self._host),), + (("organizationalUnitName", "Domain Control Validated"),), + (("commonName", "*.%s" % self._host),), ), } @@ -398,8 +392,7 @@ class fakesock(object): # See issue #206 self.is_http = False else: - ports_to_check = ( - POTENTIAL_HTTP_PORTS.union(POTENTIAL_HTTPS_PORTS)) + ports_to_check = POTENTIAL_HTTP_PORTS.union(POTENTIAL_HTTPS_PORTS) self.is_http = self._port in ports_to_check if not self.is_http: @@ -425,7 +418,7 @@ class fakesock(object): self._connected_truesock = False self._closed = True - def makefile(self, mode='r', bufsize=-1): + def makefile(self, mode="r", bufsize=-1): """Returns this fake socket's own tempfile buffer. If there is an entry associated with the socket, the file @@ -436,9 +429,7 @@ class fakesock(object): self._bufsize = bufsize if self._entry: - t = threading.Thread( - target=self._entry.fill_filekind, args=(self.fd,) - ) + t = threading.Thread(target=self._entry.fill_filekind, args=(self.fd,)) t.start() if self.timeout == socket._GLOBAL_DEFAULT_TIMEOUT: timeout = None @@ -495,12 +486,11 @@ class fakesock(object): self.fd = FakeSockFile() self.fd.socket = self try: - requestline, _ = data.split(b'\r\n', 1) - method, path, version = parse_requestline( - decode_utf8(requestline)) + requestline, _ = data.split(b"\r\n", 1) + method, path, version = parse_requestline(decode_utf8(requestline)) is_parsing_headers = True except ValueError: - path = '' + path = "" is_parsing_headers = False if self._entry is None: @@ -518,8 +508,12 @@ class fakesock(object): headers = utf8(last_requestline(self._sent_data)) meta = self._entry.request.headers body = utf8(self._sent_data[-1]) - if meta.get('transfer-encoding', '') == 'chunked': - if not body.isdigit() and (body != b'\r\n') and (body != b'0\r\n\r\n'): + if meta.get("transfer-encoding", "") == "chunked": + if ( + not body.isdigit() + and (body != b"\r\n") + and (body != b"0\r\n\r\n") + ): self._entry.request.body += body else: self._entry.request.body += body @@ -530,11 +524,11 @@ class fakesock(object): # path might come with s = urlsplit(path) POTENTIAL_HTTP_PORTS.add(int(s.port or 80)) - parts = list(map(utf8, data.split(b'\r\n\r\n', 1))) + parts = list(map(utf8, data.split(b"\r\n\r\n", 1))) if len(parts) == 2: headers, body = parts else: - headers = '' + headers = "" body = data request = httpretty.historify_request(headers, body) @@ -544,7 +538,7 @@ class fakesock(object): port=self._port, path=s.path, query=s.query, - last_request=request + last_request=request, ) matcher, entries = httpretty.match_uriinfo(info) @@ -563,8 +557,10 @@ class fakesock(object): message = [ "HTTPretty intercepted and unexpected socket method call.", - ("Please open an issue at " - "'https://github.com/gabrielfalcao/HTTPretty/issues'"), + ( + "Please open an issue at " + "'https://github.com/gabrielfalcao/HTTPretty/issues'" + ), "And paste the following traceback:\n", "".join(decode_utf8(lines)), ] @@ -577,22 +573,22 @@ class fakesock(object): self.timeout = new_timeout def send(self, *args, **kwargs): - return self.debug('send', *args, **kwargs) + return self.debug("send", *args, **kwargs) def sendto(self, *args, **kwargs): - return self.debug('sendto', *args, **kwargs) + return self.debug("sendto", *args, **kwargs) def recvfrom_into(self, *args, **kwargs): - return self.debug('recvfrom_into', *args, **kwargs) + return self.debug("recvfrom_into", *args, **kwargs) def recv_into(self, *args, **kwargs): - return self.debug('recv_into', *args, **kwargs) + return self.debug("recv_into", *args, **kwargs) def recvfrom(self, *args, **kwargs): - return self.debug('recvfrom', *args, **kwargs) + return self.debug("recvfrom", *args, **kwargs) def recv(self, *args, **kwargs): - return self.debug('recv', *args, **kwargs) + return self.debug("recv", *args, **kwargs) def __getattr__(self, name): if not self.truesock: @@ -603,21 +599,20 @@ class fakesock(object): def fake_wrap_socket(orig_wrap_socket_fn, *args, **kw): """drop-in replacement for py:func:`ssl.wrap_socket` """ - server_hostname = kw.get('server_hostname') + server_hostname = kw.get("server_hostname") if server_hostname is not None: matcher = httpretty.match_https_hostname(server_hostname) if matcher is None: - return orig_wrap_socket_fn(*args, **kw) - if 'sock' in kw: - return kw['sock'] + return orig_wrap_socket_fn(*args, **kw) + if "sock" in kw: + return kw["sock"] else: return args[0] def create_fake_connection( - address, - timeout=socket._GLOBAL_DEFAULT_TIMEOUT, - source_address=None): + address, timeout=socket._GLOBAL_DEFAULT_TIMEOUT, source_address=None +): """drop-in replacement for :py:func:`socket.create_connection`""" s = fakesock.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP) if timeout is not socket._GLOBAL_DEFAULT_TIMEOUT: @@ -630,19 +625,17 @@ def create_fake_connection( def fake_gethostbyname(host): """drop-in replacement for :py:func:`socket.gethostbyname`""" - return '127.0.0.1' + return "127.0.0.1" def fake_gethostname(): """drop-in replacement for :py:func:`socket.gethostname`""" - return 'localhost' + return "localhost" -def fake_getaddrinfo( - host, port, family=None, socktype=None, proto=None, flags=None): +def fake_getaddrinfo(host, port, family=None, socktype=None, proto=None, flags=None): """drop-in replacement for :py:func:`socket.getaddrinfo`""" - return [(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP, - '', (host, port))] + return [(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP, "", (host, port))] class Entry(BaseClass): @@ -661,12 +654,18 @@ class Entry(BaseClass): .. warning:: When using the ``forcing_headers`` option make sure to add the header ``Content-Length`` otherwise calls using :py:mod:`requests` will try to load the response endlessly. """ - def __init__(self, method, uri, body, - adding_headers=None, - forcing_headers=None, - status=200, - streaming=False, - **headers): + + def __init__( + self, + method, + uri, + body, + adding_headers=None, + forcing_headers=None, + status=200, + streaming=False, + **headers + ): self.method = method self.uri = uri @@ -685,7 +684,7 @@ class Entry(BaseClass): self.streaming = streaming if not streaming and not self.body_is_callable: - self.body_length = len(self.body or '') + self.body_length = len(self.body or "") else: self.body_length = 0 @@ -703,10 +702,9 @@ class Entry(BaseClass): """validates the body size with the value of the ``Content-Length`` header """ - content_length_keys = 'Content-Length', 'content-length' + content_length_keys = "Content-Length", "content-length" for key in content_length_keys: - got = self.adding_headers.get( - key, self.forcing_headers.get(key, None)) + got = self.adding_headers.get(key, self.forcing_headers.get(key, None)) if got is None: continue @@ -716,26 +714,21 @@ class Entry(BaseClass): igot = int(got) except (ValueError, TypeError): warnings.warn( - 'HTTPretty got to register the Content-Length header ' - 'with "%r" which is not a number' % got) + "HTTPretty got to register the Content-Length header " + 'with "%r" which is not a number' % got + ) return if igot and igot > self.body_length: raise HTTPrettyError( - 'HTTPretty got inconsistent parameters. The header ' + "HTTPretty got inconsistent parameters. The header " 'Content-Length you registered expects size "%d" but ' - 'the body you registered for that has actually length ' - '"%d".' % ( - igot, self.body_length, - ) + "the body you registered for that has actually length " + '"%d".' % (igot, self.body_length) ) def __str__(self): - return r'<Entry {} {} getting {}>'.format( - self.method, - self.uri, - self.status - ) + return r"<Entry {} {} getting {}>".format(self.method, self.uri, self.status) def normalize_headers(self, headers): """Normalize keys in header names so that ``COntent-tyPe`` becomes ``content-type`` @@ -746,7 +739,7 @@ class Entry(BaseClass): """ new = {} for k in headers: - new_k = '-'.join([s.lower() for s in k.split('-')]) + new_k = "-".join([s.lower() for s in k.split("-")]) new[new_k] = headers[k] return new @@ -761,62 +754,54 @@ class Entry(BaseClass): now = datetime.utcnow() headers = { - 'status': self.status, - 'date': now.strftime('%a, %d %b %Y %H:%M:%S GMT'), - 'server': 'Python/HTTPretty', - 'connection': 'close', + "status": self.status, + "date": now.strftime("%a, %d %b %Y %H:%M:%S GMT"), + "server": "Python/HTTPretty", + "connection": "close", } if self.forcing_headers: headers = self.forcing_headers if self.adding_headers: - headers.update( - self.normalize_headers( - self.adding_headers)) + headers.update(self.normalize_headers(self.adding_headers)) headers = self.normalize_headers(headers) - status = headers.get('status', self.status) + status = headers.get("status", self.status) if self.body_is_callable: - status, headers, self.body = self.callable_body(self.request, self.info.full_url(), headers) + status, headers, self.body = self.callable_body( + self.request, self.info.full_url(), headers + ) headers = self.normalize_headers(headers) # TODO: document this behavior: - if 'content-length' not in headers: - headers.update({ - 'content-length': len(self.body) - }) + if "content-length" not in headers: + headers.update({"content-length": len(self.body)}) - string_list = [ - 'HTTP/1.1 %d %s' % (status, STATUSES[status]), - ] + string_list = ["HTTP/1.1 %d %s" % (status, STATUSES[status])] - if 'date' in headers: - string_list.append('date: %s' % headers.pop('date')) + if "date" in headers: + string_list.append("date: %s" % headers.pop("date")) if not self.forcing_headers: - content_type = headers.pop('content-type', - 'text/plain; charset=utf-8') + content_type = headers.pop("content-type", "text/plain; charset=utf-8") - content_length = headers.pop('content-length', - self.body_length) + content_length = headers.pop("content-length", self.body_length) - string_list.append('content-type: %s' % content_type) + string_list.append("content-type: %s" % content_type) if not self.streaming: - string_list.append('content-length: %s' % content_length) + string_list.append("content-length: %s" % content_length) - server = headers.pop('server', None) + server = headers.pop("server", None) if server: - string_list.append('server: %s' % server) + string_list.append("server: %s" % server) for k, v in headers.items(): - string_list.append( - '{}: {}'.format(k, v), - ) + string_list.append("{}: {}".format(k, v)) for item in string_list: - fk.write(utf8(item) + b'\n') + fk.write(utf8(item) + b"\n") - fk.write(b'\r\n') + fk.write(b"\r\n") if self.streaming: self.body, body = itertools.tee(self.body) @@ -832,11 +817,14 @@ def url_fix(s, charset=None): """escapes special characters """ if charset: - warnings.warn("{}.url_fix() charset argument is deprecated".format(__name__), DeprecationWarning) + warnings.warn( + "{}.url_fix() charset argument is deprecated".format(__name__), + DeprecationWarning, + ) scheme, netloc, path, querystring, fragment = urlsplit(s) - path = quote(path, b'/%') - querystring = quote_plus(querystring, b':&=') + path = quote(path, b"/%") + querystring = quote_plus(querystring, b":&=") return urlunsplit((scheme, netloc, path, querystring, fragment)) @@ -855,67 +843,61 @@ class URIInfo(BaseClass): :param scheme: :param last_request: """ - default_str_attrs = ( - 'username', - 'password', - 'hostname', - 'port', - 'path', - ) - - def __init__(self, - username='', - password='', - hostname='', - port=80, - path='/', - query='', - fragment='', - scheme='', - last_request=None): - - self.username = username or '' - self.password = password or '' - self.hostname = hostname or '' + + default_str_attrs = ("username", "password", "hostname", "port", "path") + + def __init__( + self, + username="", + password="", + hostname="", + port=80, + path="/", + query="", + fragment="", + scheme="", + last_request=None, + ): + + self.username = username or "" + self.password = password or "" + self.hostname = hostname or "" if port: port = int(port) - elif scheme == 'https': + elif scheme == "https": port = 443 self.port = port or 80 - self.path = path or '' + self.path = path or "" if query: query_items = sorted(parse_qs(query).items()) - self.query = urlencode( - encode_obj(query_items), - doseq=True, - ) + self.query = urlencode(encode_obj(query_items), doseq=True) else: - self.query = '' + self.query = "" if scheme: self.scheme = scheme elif self.port in POTENTIAL_HTTPS_PORTS: - self.scheme = 'https' + self.scheme = "https" else: - self.scheme = 'http' - self.fragment = fragment or '' + self.scheme = "http" + self.fragment = fragment or "" self.last_request = last_request def to_str(self, attrs): - fmt = ", ".join(['%s="%s"' % (k, getattr(self, k, '')) for k in attrs]) - return r'<httpretty.URIInfo(%s)>' % fmt + fmt = ", ".join(['%s="%s"' % (k, getattr(self, k, "")) for k in attrs]) + return r"<httpretty.URIInfo(%s)>" % fmt def __str__(self): return self.to_str(self.default_str_attrs) def str_with_query(self): - attrs = self.default_str_attrs + ('query',) + attrs = self.default_str_attrs + ("query",) return self.to_str(attrs) def __hash__(self): - return int(hashlib.sha1(binary_type(self, 'ascii')).hexdigest(), 16) + return int(hashlib.sha1(binary_type(self, "ascii")).hexdigest(), 16) def __eq__(self, other): self_tuple = ( @@ -937,8 +919,7 @@ class URIInfo(BaseClass): """ credentials = "" if self.password: - credentials = "{}:{}@".format( - self.username, self.password) + credentials = "{}:{}@".format(self.username, self.password) query = "" if use_querystring and self.query: @@ -949,7 +930,7 @@ class URIInfo(BaseClass): credentials=credentials, domain=self.get_full_domain(), path=decode_utf8(self.path), - query=query + query=query, ) return result @@ -971,19 +952,21 @@ class URIInfo(BaseClass): :param entry: an instance of :py:class:`~httpretty.core.Entry` """ result = urlsplit(uri) - if result.scheme == 'https': + if result.scheme == "https": POTENTIAL_HTTPS_PORTS.add(int(result.port or 443)) else: POTENTIAL_HTTP_PORTS.add(int(result.port or 80)) - return cls(result.username, - result.password, - result.hostname, - result.port, - result.path, - result.query, - result.fragment, - result.scheme, - entry) + return cls( + result.username, + result.password, + result.hostname, + result.port, + result.path, + result.query, + result.fragment, + result.scheme, + entry, + ) class URIMatcher(object): @@ -993,13 +976,12 @@ class URIMatcher(object): def __init__(self, uri, entries, match_querystring=False, priority=0): self._match_querystring = match_querystring # CPython, Jython - regex_types = ('SRE_Pattern', 'org.python.modules.sre.PatternObject', - 'Pattern') + regex_types = ("SRE_Pattern", "org.python.modules.sre.PatternObject", "Pattern") is_regex = type(uri).__name__ in regex_types if is_regex: self.regex = uri result = urlsplit(uri.pattern) - if result.scheme == 'https': + if result.scheme == "https": POTENTIAL_HTTPS_PORTS.add(int(result.port or 443)) else: POTENTIAL_HTTP_PORTS.add(int(result.port or 80)) @@ -1015,13 +997,16 @@ class URIMatcher(object): def matches(self, info): if self.info: # Query string is not considered when comparing info objects, compare separately - return self.info == info and (not self._match_querystring or self.info.query == info.query) + return self.info == info and ( + not self._match_querystring or self.info.query == info.query + ) else: - return self.regex.search(info.full_url( - use_querystring=self._match_querystring)) + return self.regex.search( + info.full_url(use_querystring=self._match_querystring) + ) def __str__(self): - wrap = 'URLMatcher({})' + wrap = "URLMatcher({})" if self.info: if self._match_querystring: return wrap.format(text_type(self.info.str_with_query())) @@ -1045,8 +1030,7 @@ class URIMatcher(object): self.current_entries[method] = -1 if not self.entries or not entries_for_method: - raise ValueError('I have no entries for method %s: %s' - % (method, self)) + raise ValueError("I have no entries for method %s: %s" % (method, self)) entry = entries_for_method[self.current_entries[method]] if self.current_entries[method] != -1: @@ -1071,6 +1055,7 @@ class URIMatcher(object): class httpretty(HttpBaseClass): """manages HTTPretty's internal request/response registry and request matching. """ + _entries = {} latest_requests = [] @@ -1110,11 +1095,7 @@ class httpretty(HttpBaseClass): if matcher.info is None: pattern_with_port = "https://{0}:".format(hostname) pattern_without_port = "https://{0}/".format(hostname) - hostname_pattern = ( - hostname_re - .match(matcher.regex.pattern) - .group(0) - ) + hostname_pattern = hostname_re.match(matcher.regex.pattern).group(0) for pattern in [pattern_with_port, pattern_without_port]: if re.match(hostname_pattern, pattern): return matcher @@ -1138,30 +1119,25 @@ class httpretty(HttpBaseClass): for matcher, value in items: if matcher.info is None: if port in POTENTIAL_HTTPS_PORTS: - scheme = 'https://' + scheme = "https://" else: - scheme = 'http://' + scheme = "http://" pattern_without_port = "{0}{1}/".format(scheme, hostname) pattern_with_port = "{0}{1}:{2}/".format(scheme, hostname, port) - hostname_pattern = ( - hostname_re - .match(matcher.regex.pattern) - .group(0) - ) + hostname_pattern = hostname_re.match(matcher.regex.pattern).group(0) for pattern in [pattern_with_port, pattern_without_port]: if re.match(hostname_pattern, pattern): return matcher - elif matcher.info.hostname == hostname \ - and matcher.info.port == port: + elif matcher.info.hostname == hostname and matcher.info.port == port: return matcher return None @classmethod @contextlib.contextmanager - def record(cls, filename, indentation=4, encoding='utf-8'): + def record(cls, filename, indentation=4, encoding="utf-8"): """ .. testcode:: @@ -1186,8 +1162,7 @@ class httpretty(HttpBaseClass): import urllib3 except ImportError: msg = ( - 'HTTPretty requires urllib3 installed ' - 'for recording actual requests.' + "HTTPretty requires urllib3 installed " "for recording actual requests." ) raise RuntimeError(msg) @@ -1200,28 +1175,30 @@ class httpretty(HttpBaseClass): cls.disable() kw = {} - kw.setdefault('body', request.body) - kw.setdefault('headers', dict(request.headers)) + kw.setdefault("body", request.body) + kw.setdefault("headers", dict(request.headers)) response = http.request(request.method, uri, **kw) - calls.append({ - 'request': { - 'uri': uri, - 'method': request.method, - 'headers': dict(request.headers), - 'body': decode_utf8(request.body), - 'querystring': request.querystring - }, - 'response': { - 'status': response.status, - 'body': decode_utf8(response.data), - # urllib3 1.10 had a bug if you just did: - # dict(response.headers) - # which would cause all the values to become lists - # with the header name as the first item and the - # true value as the second item. Workaround that - 'headers': dict(response.headers.items()) + calls.append( + { + "request": { + "uri": uri, + "method": request.method, + "headers": dict(request.headers), + "body": decode_utf8(request.body), + "querystring": request.querystring, + }, + "response": { + "status": response.status, + "body": decode_utf8(response.data), + # urllib3 1.10 had a bug if you just did: + # dict(response.headers) + # which would cause all the values to become lists + # with the header name as the first item and the + # true value as the second item. Workaround that + "headers": dict(response.headers.items()), + }, } - }) + ) cls.enable() return response.status, response.headers, response.data @@ -1230,7 +1207,7 @@ class httpretty(HttpBaseClass): yield cls.disable() - with codecs.open(filename, 'w', encoding) as f: + with codecs.open(filename, "w", encoding) as f: f.write(json.dumps(calls, indent=indentation)) @classmethod @@ -1257,10 +1234,10 @@ class httpretty(HttpBaseClass): data = json.loads(open(filename).read()) for item in data: - uri = item['request']['uri'] - method = item['request']['method'] - body = item['response']['body'] - headers = item['response']['headers'] + uri = item["request"]["uri"] + method = item["request"]["method"] + body = item["response"]["body"] + headers = item["response"]["headers"] cls.register_uri(method, uri, body=body, forcing_headers=headers) yield @@ -1277,7 +1254,7 @@ class httpretty(HttpBaseClass): cls.last_request = HTTPrettyRequestEmpty() @classmethod - def historify_request(cls, headers, body='', append=True): + def historify_request(cls, headers, body="", append=True): """appends request to a list for later retrieval .. testcode:: @@ -1299,14 +1276,19 @@ class httpretty(HttpBaseClass): return request @classmethod - def register_uri(cls, method, uri, body='{"message": "HTTPretty :)"}', - adding_headers=None, - forcing_headers=None, - status=200, - responses=None, - match_querystring=False, - priority=0, - **headers): + def register_uri( + cls, + method, + uri, + body='{"message": "HTTPretty :)"}', + adding_headers=None, + forcing_headers=None, + status=200, + responses=None, + match_querystring=False, + priority=0, + **headers + ): """ .. testcode:: @@ -1343,8 +1325,8 @@ class httpretty(HttpBaseClass): """ uri_is_string = isinstance(uri, basestring) - if uri_is_string and re.search(r'^\w+://[^/]+[.]\w{2,}$', uri): - uri += '/' + if uri_is_string and re.search(r"^\w+://[^/]+[.]\w{2,}$", uri): + uri += "/" if isinstance(responses, list) and len(responses) > 0: for response in responses: @@ -1352,17 +1334,14 @@ class httpretty(HttpBaseClass): response.method = method entries_for_this_uri = responses else: - headers[str('body')] = body - headers[str('adding_headers')] = adding_headers - headers[str('forcing_headers')] = forcing_headers - headers[str('status')] = status + headers[str("body")] = body + headers[str("adding_headers")] = adding_headers + headers[str("forcing_headers")] = forcing_headers + headers[str("status")] = status - entries_for_this_uri = [ - cls.Response(method=method, uri=uri, **headers), - ] + entries_for_this_uri = [cls.Response(method=method, uri=uri, **headers)] - matcher = URIMatcher(uri, entries_for_this_uri, - match_querystring, priority) + matcher = URIMatcher(uri, entries_for_this_uri, match_querystring, priority) if matcher in cls._entries: matcher.entries.extend(cls._entries[matcher]) del cls._entries[matcher] @@ -1370,18 +1349,20 @@ class httpretty(HttpBaseClass): cls._entries[matcher] = entries_for_this_uri def __str__(self): - return '<HTTPretty with %d URI entries>' % len(self._entries) + return "<HTTPretty with %d URI entries>" % len(self._entries) @classmethod def Response( - cls, body, - method=None, - uri=None, - adding_headers=None, - forcing_headers=None, - status=200, - streaming=False, - **kw): + cls, + body, + method=None, + uri=None, + adding_headers=None, + forcing_headers=None, + status=200, + streaming=False, + **kw + ): """ shortcut to create an :py:class:`~httpretty.core.Entry` that takes the body as first positional argument @@ -1396,11 +1377,11 @@ class httpretty(HttpBaseClass): :param kw: keyword-arguments passed onto the :py:class:`~httpretty.core.Entry` :returns: an :py:class:`~httpretty.core.Entry` """ - kw['body'] = body - kw['adding_headers'] = adding_headers - kw['forcing_headers'] = forcing_headers - kw['status'] = int(status) - kw['streaming'] = streaming + kw["body"] = body + kw["adding_headers"] = adding_headers + kw["forcing_headers"] = forcing_headers + kw["status"] = int(status) + kw["streaming"] = streaming return Entry(method, uri, **kw) @classmethod @@ -1434,18 +1415,18 @@ class httpretty(HttpBaseClass): socket.gethostbyname = old_gethostbyname socket.getaddrinfo = old_getaddrinfo - socket.__dict__['socket'] = old_socket - socket.__dict__['_socketobject'] = old_socket - socket.__dict__['SocketType'] = old_SocketType + socket.__dict__["socket"] = old_socket + socket.__dict__["_socketobject"] = old_socket + socket.__dict__["SocketType"] = old_SocketType - socket.__dict__['create_connection'] = old_create_connection - socket.__dict__['gethostname'] = old_gethostname - socket.__dict__['gethostbyname'] = old_gethostbyname - socket.__dict__['getaddrinfo'] = old_getaddrinfo + socket.__dict__["create_connection"] = old_create_connection + socket.__dict__["gethostname"] = old_gethostname + socket.__dict__["gethostbyname"] = old_gethostbyname + socket.__dict__["getaddrinfo"] = old_getaddrinfo if socks: socks.socksocket = old_socksocket - socks.__dict__['socksocket'] = old_socksocket + socks.__dict__["socksocket"] = old_socksocket if ssl: ssl.wrap_socket = old_ssl_wrap_socket @@ -1454,18 +1435,18 @@ class httpretty(HttpBaseClass): ssl.SSLContext.wrap_socket = old_sslcontext_wrap_socket except AttributeError: pass - ssl.__dict__['wrap_socket'] = old_ssl_wrap_socket - ssl.__dict__['SSLSocket'] = old_sslsocket + ssl.__dict__["wrap_socket"] = old_ssl_wrap_socket + ssl.__dict__["SSLSocket"] = old_sslsocket if not PY3: ssl.sslwrap_simple = old_sslwrap_simple - ssl.__dict__['sslwrap_simple'] = old_sslwrap_simple + ssl.__dict__["sslwrap_simple"] = old_sslwrap_simple if requests_urllib3_connection is not None: - requests_urllib3_connection.ssl_wrap_socket = \ - old_requests_ssl_wrap_socket - requests_urllib3_connection.__dict__['ssl_wrap_socket'] = \ - old_requests_ssl_wrap_socket + requests_urllib3_connection.ssl_wrap_socket = old_requests_ssl_wrap_socket + requests_urllib3_connection.__dict__[ + "ssl_wrap_socket" + ] = old_requests_ssl_wrap_socket @classmethod def is_enabled(cls): @@ -1516,7 +1497,7 @@ class httpretty(HttpBaseClass): cls._is_enabled = True # Some versions of python internally shadowed the # SocketType variable incorrectly https://bugs.python.org/issue20386 - bad_socket_shadow = (socket.socket != socket.SocketType) + bad_socket_shadow = socket.socket != socket.SocketType socket.socket = fakesock.socket socket._socketobject = fakesock.socket @@ -1528,40 +1509,42 @@ class httpretty(HttpBaseClass): socket.gethostbyname = fake_gethostbyname socket.getaddrinfo = fake_getaddrinfo - socket.__dict__['socket'] = fakesock.socket - socket.__dict__['_socketobject'] = fakesock.socket + socket.__dict__["socket"] = fakesock.socket + socket.__dict__["_socketobject"] = fakesock.socket if not bad_socket_shadow: - socket.__dict__['SocketType'] = fakesock.socket + socket.__dict__["SocketType"] = fakesock.socket - socket.__dict__['create_connection'] = create_fake_connection - socket.__dict__['gethostname'] = fake_gethostname - socket.__dict__['gethostbyname'] = fake_gethostbyname - socket.__dict__['getaddrinfo'] = fake_getaddrinfo + socket.__dict__["create_connection"] = create_fake_connection + socket.__dict__["gethostname"] = fake_gethostname + socket.__dict__["gethostbyname"] = fake_gethostbyname + socket.__dict__["getaddrinfo"] = fake_getaddrinfo if socks: socks.socksocket = fakesock.socket - socks.__dict__['socksocket'] = fakesock.socket + socks.__dict__["socksocket"] = fakesock.socket if ssl: new_wrap = partial(fake_wrap_socket, old_ssl_wrap_socket) ssl.wrap_socket = new_wrap ssl.SSLSocket = FakeSSLSocket try: - ssl.SSLContext.wrap_socket = partial(fake_wrap_socket, old_sslcontext_wrap_socket) + ssl.SSLContext.wrap_socket = partial( + fake_wrap_socket, old_sslcontext_wrap_socket + ) except AttributeError: pass - ssl.__dict__['wrap_socket'] = new_wrap - ssl.__dict__['SSLSocket'] = FakeSSLSocket + ssl.__dict__["wrap_socket"] = new_wrap + ssl.__dict__["SSLSocket"] = FakeSSLSocket if not PY3: ssl.sslwrap_simple = new_wrap - ssl.__dict__['sslwrap_simple'] = new_wrap + ssl.__dict__["sslwrap_simple"] = new_wrap if requests_urllib3_connection is not None: new_wrap = partial(fake_wrap_socket, old_requests_ssl_wrap_socket) requests_urllib3_connection.ssl_wrap_socket = new_wrap - requests_urllib3_connection.__dict__['ssl_wrap_socket'] = new_wrap + requests_urllib3_connection.__dict__["ssl_wrap_socket"] = new_wrap class httprettized(object): @@ -1579,6 +1562,7 @@ class httprettized(object): assert httpretty.latest_requests[-1].url == 'https://httpbin.org/ip' assert response.json() == {'origin': '42.42.42.42'} """ + def __init__(self, allow_net_connect=True): self.allow_net_connect = allow_net_connect @@ -1637,15 +1621,14 @@ def httprettified(test=None, allow_net_connect=True): }) """ + def decorate_unittest_TestCase_setUp(klass): # Prefer addCleanup (added in python 2.7), but fall back # to using tearDown if it isn't available - use_addCleanup = hasattr(klass, 'addCleanup') + use_addCleanup = hasattr(klass, "addCleanup") - original_setUp = (klass.setUp - if hasattr(klass, 'setUp') - else None) + original_setUp = klass.setUp if hasattr(klass, "setUp") else None def new_setUp(self): httpretty.reset() @@ -1654,25 +1637,25 @@ def httprettified(test=None, allow_net_connect=True): self.addCleanup(httpretty.disable) if original_setUp: original_setUp(self) + klass.setUp = new_setUp if not use_addCleanup: - original_tearDown = (klass.setUp - if hasattr(klass, 'tearDown') - else None) + original_tearDown = klass.setUp if hasattr(klass, "tearDown") else None def new_tearDown(self): httpretty.disable() httpretty.reset() if original_tearDown: original_tearDown(self) + klass.tearDown = new_tearDown return klass def decorate_test_methods(klass): for attr in dir(klass): - if not attr.startswith('test_'): + if not attr.startswith("test_"): continue attr_value = getattr(klass, attr) @@ -1685,11 +1668,13 @@ def httprettified(test=None, allow_net_connect=True): def is_unittest_TestCase(klass): try: import unittest + return issubclass(klass, unittest.TestCase) except ImportError: return False "A decorator for tests that use HTTPretty" + def decorate_class(klass): if is_unittest_TestCase(klass): return decorate_unittest_TestCase_setUp(klass) @@ -1700,6 +1685,7 @@ def httprettified(test=None, allow_net_connect=True): def wrapper(*args, **kw): with httprettized(allow_net_connect): return test(*args, **kw) + return wrapper if isinstance(test, ClassTypes): diff --git a/httpretty/errors.py b/httpretty/errors.py index c69a438..1f1b94f 100644 --- a/httpretty/errors.py +++ b/httpretty/errors.py @@ -34,6 +34,6 @@ class HTTPrettyError(Exception): class UnmockedError(HTTPrettyError): def __init__(self): super(UnmockedError, self).__init__( - 'No mocking was registered, and real connections are ' - 'not allowed (httpretty.allow_net_connect = False).' + "No mocking was registered, and real connections are " + "not allowed (httpretty.allow_net_connect = False)." ) diff --git a/httpretty/http.py b/httpretty/http.py index 79dac73..7e323ac 100644 --- a/httpretty/http.py +++ b/httpretty/http.py @@ -108,14 +108,14 @@ STATUSES = { class HttpBaseClass(BaseClass): - GET = 'GET' - PUT = 'PUT' - POST = 'POST' - DELETE = 'DELETE' - HEAD = 'HEAD' - PATCH = 'PATCH' - OPTIONS = 'OPTIONS' - CONNECT = 'CONNECT' + GET = "GET" + PUT = "PUT" + POST = "POST" + DELETE = "DELETE" + HEAD = "HEAD" + PATCH = "PATCH" + OPTIONS = "OPTIONS" + CONNECT = "CONNECT" METHODS = (GET, PUT, POST, DELETE, HEAD, PATCH, OPTIONS, CONNECT) @@ -132,12 +132,12 @@ def parse_requestline(s): ... ValueError: Not a Request-Line """ - methods = '|'.join(HttpBaseClass.METHODS) - m = re.match(r'(' + methods + r')\s+(.*)\s+HTTP/(1.[0|1])', s, re.I) + methods = "|".join(HttpBaseClass.METHODS) + m = re.match(r"(" + methods + r")\s+(.*)\s+HTTP/(1.[0|1])", s, re.I) if m: return m.group(1).upper(), m.group(2), m.group(3) else: - raise ValueError('Not a Request-Line') + raise ValueError("Not a Request-Line") def last_requestline(sent_data): diff --git a/httpretty/utils.py b/httpretty/utils.py index c203796..5c0fa13 100644 --- a/httpretty/utils.py +++ b/httpretty/utils.py @@ -25,14 +25,12 @@ # OTHER DEALINGS IN THE SOFTWARE. from __future__ import unicode_literals -from .compat import ( - binary_type, text_type -) +from .compat import binary_type, text_type def utf8(s): if isinstance(s, text_type): - s = s.encode('utf-8') + s = s.encode("utf-8") elif s is None: return binary_type() diff --git a/httpretty/version.py b/httpretty/version.py index cd558f2..71353ea 100644 --- a/httpretty/version.py +++ b/httpretty/version.py @@ -1 +1 @@ -version = '0.9.5' +version = "0.9.5" @@ -31,44 +31,44 @@ from setuptools import setup, find_packages def read_version(): ctx = {} - exec(local_file('httpretty', 'version.py'), ctx) - return ctx['version'] + exec(local_file("httpretty", "version.py"), ctx) + return ctx["version"] -local_file = lambda *f: \ - io.open( - os.path.join(os.path.dirname(__file__), *f), encoding='utf-8').read() +local_file = lambda *f: io.open( + os.path.join(os.path.dirname(__file__), *f), encoding="utf-8" +).read() -install_requires = ['six'] -tests_requires = ['nose', 'sure', 'coverage', 'mock', 'rednose'] +install_requires = ["six"] +tests_requires = ["nose", "sure", "coverage", "mock", "rednose"] setup( - name='httpretty', + name="httpretty", version=read_version(), - description='HTTP client mock for Python', - long_description=local_file('README.rst'), - author='Gabriel Falcao', - author_email='gabriel@nacaolivre.org', - url='https://httpretty.readthedocs.io', + description="HTTP client mock for Python", + long_description=local_file("README.rst"), + author="Gabriel Falcao", + author_email="gabriel@nacaolivre.org", + url="https://httpretty.readthedocs.io", zip_safe=False, - packages=find_packages(exclude=['*tests*']), + packages=find_packages(exclude=["*tests*"]), tests_require=tests_requires, install_requires=install_requires, - license='MIT', - test_suite='nose.collector', + license="MIT", + test_suite="nose.collector", classifiers=[ - 'Development Status :: 5 - Production/Stable', - 'Intended Audience :: Developers', - 'License :: OSI Approved :: MIT License', - 'Programming Language :: Python :: 2', - 'Programming Language :: Python :: 2.7', - 'Programming Language :: Python :: 3', - 'Programming Language :: Python :: 3.6', - 'Programming Language :: Python :: 3.7', - 'Programming Language :: Python', - 'Topic :: Internet :: WWW/HTTP', - 'Topic :: Software Development :: Testing' + "Development Status :: 5 - Production/Stable", + "Intended Audience :: Developers", + "License :: OSI Approved :: MIT License", + "Programming Language :: Python :: 2", + "Programming Language :: Python :: 2.7", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.6", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python", + "Topic :: Internet :: WWW/HTTP", + "Topic :: Software Development :: Testing", ], ) diff --git a/tests/functional/__init__.py b/tests/functional/__init__.py index 4eaed8c..ad1391a 100644 --- a/tests/functional/__init__.py +++ b/tests/functional/__init__.py @@ -25,4 +25,5 @@ # OTHER DEALINGS IN THE SOFTWARE. import warnings -warnings.simplefilter('ignore') + +warnings.simplefilter("ignore") diff --git a/tests/functional/base.py b/tests/functional/base.py index 61bd89f..884c483 100644 --- a/tests/functional/base.py +++ b/tests/functional/base.py @@ -44,28 +44,33 @@ def get_free_tcp_port(): """returns a TCP port that can be used for listen in the host. """ tcp = old_socket(socket.AF_INET, socket.SOCK_STREAM) - tcp.bind(('', 0)) + tcp.bind(("", 0)) host, port = tcp.getsockname() tcp.close() return port LOCAL_FILE = lambda *path: join(abspath(dirname(__file__)), *path) -FIXTURE_FILE = lambda name: LOCAL_FILE('fixtures', name) +FIXTURE_FILE = lambda name: LOCAL_FILE("fixtures", name) class JSONEchoHandler(tornado.web.RequestHandler): def get(self, matched): payload = dict([(x, self.get_argument(x)) for x in self.request.arguments]) - self.write(json.dumps({matched or 'index': payload}, indent=4)) + self.write(json.dumps({matched or "index": payload}, indent=4)) def post(self, matched): payload = dict(self.request.arguments) - self.write(json.dumps({ - matched or 'index': payload, - 'req_body': self.request.body.decode('utf-8'), - 'req_headers': dict(self.request.headers.items()), - }, indent=4)) + self.write( + json.dumps( + { + matched or "index": payload, + "req_body": self.request.body.decode("utf-8"), + "req_headers": dict(self.request.headers.items()), + }, + indent=4, + ) + ) class JSONEchoServer(threading.Thread): @@ -83,9 +88,7 @@ class JSONEchoServer(threading.Thread): return self._stop.isSet() def setup_application(self): - return tornado.web.Application([ - (r"/(.*)", JSONEchoHandler), - ]) + return tornado.web.Application([(r"/(.*)", JSONEchoHandler)]) def run(self): loop = tornado.ioloop.IOLoop() @@ -102,9 +105,9 @@ def use_tornado_server(callback): @wraps(callback) def func(*args, **kw): - port = os.getenv('TEST_PORT', get_free_tcp_port()) + port = os.getenv("TEST_PORT", get_free_tcp_port()) POTENTIAL_HTTP_PORTS.add(port) - kw['port'] = port + kw["port"] = port server = JSONEchoServer(lock, port) server.start() try: @@ -115,4 +118,5 @@ def use_tornado_server(callback): server.stop() if port in POTENTIAL_HTTP_PORTS: POTENTIAL_HTTP_PORTS.remove(port) + return func diff --git a/tests/functional/test_bypass.py b/tests/functional/test_bypass.py index 6446fe9..bd31454 100644 --- a/tests/functional/test_bypass.py +++ b/tests/functional/test_bypass.py @@ -27,6 +27,7 @@ from __future__ import unicode_literals import time import requests + try: import urllib.request as urllib2 except ImportError: @@ -58,7 +59,7 @@ def start_http_server(context): httpretty.disable() time.sleep(.1) try: - requests.get('http://localhost:{}/'.format(context.http_port)) + requests.get("http://localhost:{}/".format(context.http_port)) ready = True except (Exception, BaseException): if time.time() - started_at >= timeout: @@ -92,31 +93,38 @@ def test_httpretty_bypasses_when_disabled(context): "httpretty should bypass all requests by disabling it" httpretty.register_uri( - httpretty.GET, "http://localhost:{}/go-for-bubbles/".format(context.http_port), - body="glub glub") + httpretty.GET, + "http://localhost:{}/go-for-bubbles/".format(context.http_port), + body="glub glub", + ) httpretty.disable() - fd = urllib2.urlopen('http://localhost:{}/go-for-bubbles/'.format(context.http_port)) + fd = urllib2.urlopen( + "http://localhost:{}/go-for-bubbles/".format(context.http_port) + ) got1 = fd.read() fd.close() expect(got1).to.equal( - b'. o O 0 O o . o O 0 O o . o O 0 O o . o O 0 O o . o O 0 O o .') + b". o O 0 O o . o O 0 O o . o O 0 O o . o O 0 O o . o O 0 O o ." + ) - fd = urllib2.urlopen('http://localhost:{}/come-again/'.format(context.http_port)) + fd = urllib2.urlopen("http://localhost:{}/come-again/".format(context.http_port)) got2 = fd.read() fd.close() - expect(got2).to.equal(b'<- HELLO WORLD ->') + expect(got2).to.equal(b"<- HELLO WORLD ->") httpretty.enable() - fd = urllib2.urlopen('http://localhost:{}/go-for-bubbles/'.format(context.http_port)) + fd = urllib2.urlopen( + "http://localhost:{}/go-for-bubbles/".format(context.http_port) + ) got3 = fd.read() fd.close() - expect(got3).to.equal(b'glub glub') + expect(got3).to.equal(b"glub glub") core.POTENTIAL_HTTP_PORTS.remove(context.http_port) @@ -126,20 +134,24 @@ def test_httpretty_bypasses_a_unregistered_request(context): "httpretty should bypass a unregistered request by disabling it" httpretty.register_uri( - httpretty.GET, "http://localhost:{}/go-for-bubbles/".format(context.http_port), - body="glub glub") - - fd = urllib2.urlopen('http://localhost:{}/go-for-bubbles/'.format(context.http_port)) + httpretty.GET, + "http://localhost:{}/go-for-bubbles/".format(context.http_port), + body="glub glub", + ) + + fd = urllib2.urlopen( + "http://localhost:{}/go-for-bubbles/".format(context.http_port) + ) got1 = fd.read() fd.close() - expect(got1).to.equal(b'glub glub') + expect(got1).to.equal(b"glub glub") - fd = urllib2.urlopen('http://localhost:{}/come-again/'.format(context.http_port)) + fd = urllib2.urlopen("http://localhost:{}/come-again/".format(context.http_port)) got2 = fd.read() fd.close() - expect(got2).to.equal(b'<- HELLO WORLD ->') + expect(got2).to.equal(b"<- HELLO WORLD ->") core.POTENTIAL_HTTP_PORTS.remove(context.http_port) @@ -148,15 +160,13 @@ def test_httpretty_bypasses_a_unregistered_request(context): def test_using_httpretty_with_other_tcp_protocols(context): "httpretty should work even when testing code that also use other TCP-based protocols" - httpretty.register_uri( - httpretty.GET, "http://falcao.it/foo/", - body="BAR") + httpretty.register_uri(httpretty.GET, "http://falcao.it/foo/", body="BAR") - fd = urllib2.urlopen('http://falcao.it/foo/') + fd = urllib2.urlopen("http://falcao.it/foo/") got1 = fd.read() fd.close() - expect(got1).to.equal(b'BAR') + expect(got1).to.equal(b"BAR") expect(context.client.send("foobar")).to.equal(b"RECEIVED: foobar") @@ -168,13 +178,14 @@ def test_disallow_net_connect_1(context): When allow_net_connect = False, a request that otherwise would have worked results in UnmockedError. """ - httpretty.register_uri(httpretty.GET, "http://falcao.it/foo/", - body="BAR") + httpretty.register_uri(httpretty.GET, "http://falcao.it/foo/", body="BAR") def foo(): fd = None try: - fd = urllib2.urlopen('http://localhost:{}/go-for-bubbles/'.format(context.http_port)) + fd = urllib2.urlopen( + "http://localhost:{}/go-for-bubbles/".format(context.http_port) + ) finally: if fd: fd.close() @@ -192,7 +203,7 @@ def test_disallow_net_connect_2(): def foo(): fd = None try: - fd = urllib2.urlopen('http://example.com/nonsense') + fd = urllib2.urlopen("http://example.com/nonsense") finally: if fd: fd.close() @@ -204,9 +215,8 @@ def test_disallow_net_connect_2(): def test_disallow_net_connect_3(): "When allow_net_connect = False, mocked requests still work correctly." - httpretty.register_uri(httpretty.GET, "http://falcao.it/foo/", - body="BAR") - fd = urllib2.urlopen('http://falcao.it/foo/') + httpretty.register_uri(httpretty.GET, "http://falcao.it/foo/", body="BAR") + fd = urllib2.urlopen("http://falcao.it/foo/") got1 = fd.read() fd.close() - expect(got1).to.equal(b'BAR') + expect(got1).to.equal(b"BAR") diff --git a/tests/functional/test_debug.py b/tests/functional/test_debug.py index 995da8d..577038e 100644 --- a/tests/functional/test_debug.py +++ b/tests/functional/test_debug.py @@ -31,11 +31,7 @@ from httpretty import httprettified def create_socket(context): - context.sock = socket.socket( - socket.AF_INET, - socket.SOCK_STREAM, - socket.IPPROTO_TCP, - ) + context.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP) context.sock.is_http = True @@ -45,8 +41,7 @@ def test_httpretty_debugs_socket_send(context): "HTTPretty should debug socket.send" expect(context.sock.send).when.called.to.throw( - RuntimeError, - "HTTPretty intercepted and unexpected socket method call." + RuntimeError, "HTTPretty intercepted and unexpected socket method call." ) @@ -56,8 +51,7 @@ def test_httpretty_debugs_socket_sendto(context): "HTTPretty should debug socket.sendto" expect(context.sock.sendto).when.called.to.throw( - RuntimeError, - "HTTPretty intercepted and unexpected socket method call." + RuntimeError, "HTTPretty intercepted and unexpected socket method call." ) @@ -67,8 +61,7 @@ def test_httpretty_debugs_socket_recv(context): "HTTPretty should debug socket.recv" expect(context.sock.recv).when.called.to.throw( - RuntimeError, - "HTTPretty intercepted and unexpected socket method call." + RuntimeError, "HTTPretty intercepted and unexpected socket method call." ) @@ -78,8 +71,7 @@ def test_httpretty_debugs_socket_recvfrom(context): "HTTPretty should debug socket.recvfrom" expect(context.sock.recvfrom).when.called.to.throw( - RuntimeError, - "HTTPretty intercepted and unexpected socket method call." + RuntimeError, "HTTPretty intercepted and unexpected socket method call." ) @@ -89,8 +81,7 @@ def test_httpretty_debugs_socket_recv_into(context): "HTTPretty should debug socket.recv_into" expect(context.sock.recv_into).when.called.to.throw( - RuntimeError, - "HTTPretty intercepted and unexpected socket method call." + RuntimeError, "HTTPretty intercepted and unexpected socket method call." ) @@ -100,6 +91,5 @@ def test_httpretty_debugs_socket_recvfrom_into(context): "HTTPretty should debug socket.recvfrom_into" expect(context.sock.recvfrom_into).when.called.to.throw( - RuntimeError, - "HTTPretty intercepted and unexpected socket method call." + RuntimeError, "HTTPretty intercepted and unexpected socket method call." ) diff --git a/tests/functional/test_decorator.py b/tests/functional/test_decorator.py index d1f156a..5834d33 100644 --- a/tests/functional/test_decorator.py +++ b/tests/functional/test_decorator.py @@ -11,34 +11,30 @@ except ImportError: @httprettified def test_decor(): - HTTPretty.register_uri( - HTTPretty.GET, "http://localhost/", - body="glub glub") + HTTPretty.register_uri(HTTPretty.GET, "http://localhost/", body="glub glub") - fd = urllib2.urlopen('http://localhost/') + fd = urllib2.urlopen("http://localhost/") got1 = fd.read() fd.close() - expect(got1).to.equal(b'glub glub') + expect(got1).to.equal(b"glub glub") @httprettified class DecoratedNonUnitTest(object): - def test_fail(self): - raise AssertionError('Tests in this class should not ' - 'be executed by the test runner.') + raise AssertionError( + "Tests in this class should not " "be executed by the test runner." + ) def test_decorated(self): - HTTPretty.register_uri( - HTTPretty.GET, "http://localhost/", - body="glub glub") + HTTPretty.register_uri(HTTPretty.GET, "http://localhost/", body="glub glub") - fd = urllib2.urlopen('http://localhost/') + fd = urllib2.urlopen("http://localhost/") got1 = fd.read() fd.close() - expect(got1).to.equal(b'glub glub') + expect(got1).to.equal(b"glub glub") class NonUnitTestTest(TestCase): @@ -52,65 +48,61 @@ class NonUnitTestTest(TestCase): @httprettified class ClassDecorator(TestCase): - def test_decorated(self): - HTTPretty.register_uri( - HTTPretty.GET, "http://localhost/", - body="glub glub") + HTTPretty.register_uri(HTTPretty.GET, "http://localhost/", body="glub glub") - fd = urllib2.urlopen('http://localhost/') + fd = urllib2.urlopen("http://localhost/") got1 = fd.read() fd.close() - expect(got1).to.equal(b'glub glub') + expect(got1).to.equal(b"glub glub") def test_decorated2(self): - HTTPretty.register_uri( - HTTPretty.GET, "http://localhost/", - body="buble buble") + HTTPretty.register_uri(HTTPretty.GET, "http://localhost/", body="buble buble") - fd = urllib2.urlopen('http://localhost/') + fd = urllib2.urlopen("http://localhost/") got1 = fd.read() fd.close() - expect(got1).to.equal(b'buble buble') + expect(got1).to.equal(b"buble buble") @httprettified class ClassDecoratorWithSetUp(TestCase): - def setUp(self): HTTPretty.register_uri( - HTTPretty.GET, "http://localhost/", + HTTPretty.GET, + "http://localhost/", responses=[ HTTPretty.Response("glub glub"), HTTPretty.Response("buble buble"), - ]) + ], + ) def test_decorated(self): - fd = urllib2.urlopen('http://localhost/') + fd = urllib2.urlopen("http://localhost/") got1 = fd.read() fd.close() - expect(got1).to.equal(b'glub glub') + expect(got1).to.equal(b"glub glub") - fd = urllib2.urlopen('http://localhost/') + fd = urllib2.urlopen("http://localhost/") got2 = fd.read() fd.close() - expect(got2).to.equal(b'buble buble') + expect(got2).to.equal(b"buble buble") def test_decorated2(self): - fd = urllib2.urlopen('http://localhost/') + fd = urllib2.urlopen("http://localhost/") got1 = fd.read() fd.close() - expect(got1).to.equal(b'glub glub') + expect(got1).to.equal(b"glub glub") - fd = urllib2.urlopen('http://localhost/') + fd = urllib2.urlopen("http://localhost/") got2 = fd.read() fd.close() - expect(got2).to.equal(b'buble buble') + expect(got2).to.equal(b"buble buble") diff --git a/tests/functional/test_fakesocket.py b/tests/functional/test_fakesocket.py index 215d074..edacd77 100644 --- a/tests/functional/test_fakesocket.py +++ b/tests/functional/test_fakesocket.py @@ -36,6 +36,7 @@ class FakeSocket(socket.socket): Just an editable socket factory It allows mock to patch readonly functions """ + connect = sendall = lambda *args, **kw: None @@ -50,26 +51,24 @@ def recv(flag, size): the asked size passed in argument. Any further call will just raise RuntimeError """ - if 'was_here' in flag: - raise RuntimeError('Already sent everything') + if "was_here" in flag: + raise RuntimeError("Already sent everything") else: - flag['was_here'] = None - return 'a' * (size - 1) + flag["was_here"] = None + return "a" * (size - 1) recv = functools.partial(recv, fake_socket_interupter_flag) -@mock.patch('httpretty.old_socket', new=FakeSocket) +@mock.patch("httpretty.old_socket", new=FakeSocket) def _test_shorten_response(): u"HTTPretty shouldn't try to read from server when communication is over" from sure import expect import httpretty - fakesocket = httpretty.fakesock.socket(socket.AF_INET, - socket.SOCK_STREAM) - with mock.patch.object(fakesocket.truesock, 'recv', recv): - fakesocket.connect(('localhost', 80)) - fakesocket._true_sendall('WHATEVER') - expect(fakesocket.fd.read()).to.equal( - 'a' * (httpretty.socket_buffer_size - 1)) + fakesocket = httpretty.fakesock.socket(socket.AF_INET, socket.SOCK_STREAM) + with mock.patch.object(fakesocket.truesock, "recv", recv): + fakesocket.connect(("localhost", 80)) + fakesocket._true_sendall("WHATEVER") + expect(fakesocket.fd.read()).to.equal("a" * (httpretty.socket_buffer_size - 1)) diff --git a/tests/functional/test_httplib2.py b/tests/functional/test_httplib2.py index 9aa6a5b..8d4f947 100644 --- a/tests/functional/test_httplib2.py +++ b/tests/functional/test_httplib2.py @@ -38,14 +38,15 @@ from httpretty.core import decode_utf8 def test_httpretty_should_mock_a_simple_get_with_httplib2_read(now): "HTTPretty should mock a simple GET with httplib2.context.http" - HTTPretty.register_uri(HTTPretty.GET, "http://yipit.com/", - body="Find the best daily deals") + HTTPretty.register_uri( + HTTPretty.GET, "http://yipit.com/", body="Find the best daily deals" + ) - _, got = httplib2.Http().request('http://yipit.com', 'GET') - expect(got).to.equal(b'Find the best daily deals') + _, got = httplib2.Http().request("http://yipit.com", "GET") + expect(got).to.equal(b"Find the best daily deals") - expect(HTTPretty.last_request.method).to.equal('GET') - expect(HTTPretty.last_request.path).to.equal('/') + expect(HTTPretty.last_request.method).to.equal("GET") + expect(HTTPretty.last_request.path).to.equal("/") @httprettified @@ -53,14 +54,14 @@ def test_httpretty_should_mock_a_simple_get_with_httplib2_read(now): def test_httpretty_provides_easy_access_to_querystrings(now): "HTTPretty should provide an easy access to the querystring" - HTTPretty.register_uri(HTTPretty.GET, "http://yipit.com/", - body="Find the best daily deals") + HTTPretty.register_uri( + HTTPretty.GET, "http://yipit.com/", body="Find the best daily deals" + ) - httplib2.Http().request('http://yipit.com?foo=bar&foo=baz&chuck=norris', 'GET') - expect(HTTPretty.last_request.querystring).to.equal({ - 'foo': ['bar', 'baz'], - 'chuck': ['norris'], - }) + httplib2.Http().request("http://yipit.com?foo=bar&foo=baz&chuck=norris", "GET") + expect(HTTPretty.last_request.querystring).to.equal( + {"foo": ["bar", "baz"], "chuck": ["norris"]} + ) @httprettified @@ -68,20 +69,25 @@ def test_httpretty_provides_easy_access_to_querystrings(now): def test_httpretty_should_mock_headers_httplib2(now): "HTTPretty should mock basic headers with httplib2" - HTTPretty.register_uri(HTTPretty.GET, "http://github.com/", - body="this is supposed to be the response", - status=201) + HTTPretty.register_uri( + HTTPretty.GET, + "http://github.com/", + body="this is supposed to be the response", + status=201, + ) - headers, _ = httplib2.Http().request('http://github.com', 'GET') - expect(headers['status']).to.equal('201') - expect(dict(headers)).to.equal({ - 'content-type': 'text/plain; charset=utf-8', - 'connection': 'close', - 'content-length': '35', - 'status': '201', - 'server': 'Python/HTTPretty', - 'date': now.strftime('%a, %d %b %Y %H:%M:%S GMT'), - }) + headers, _ = httplib2.Http().request("http://github.com", "GET") + expect(headers["status"]).to.equal("201") + expect(dict(headers)).to.equal( + { + "content-type": "text/plain; charset=utf-8", + "connection": "close", + "content-length": "35", + "status": "201", + "server": "Python/HTTPretty", + "date": now.strftime("%a, %d %b %Y %H:%M:%S GMT"), + } + ) @httprettified @@ -89,25 +95,30 @@ def test_httpretty_should_mock_headers_httplib2(now): def test_httpretty_should_allow_adding_and_overwritting_httplib2(now): "HTTPretty should allow adding and overwritting headers with httplib2" - HTTPretty.register_uri(HTTPretty.GET, "http://github.com/foo", - body="this is supposed to be the response", - adding_headers={ - 'Server': 'Apache', - 'Content-Length': '27', - 'Content-Type': 'application/json', - }) - - headers, _ = httplib2.Http().request('http://github.com/foo', 'GET') + HTTPretty.register_uri( + HTTPretty.GET, + "http://github.com/foo", + body="this is supposed to be the response", + adding_headers={ + "Server": "Apache", + "Content-Length": "27", + "Content-Type": "application/json", + }, + ) - expect(dict(headers)).to.equal({ - 'content-type': 'application/json', - 'content-location': 'http://github.com/foo', - 'connection': 'close', - 'content-length': '27', - 'status': '200', - 'server': 'Apache', - 'date': now.strftime('%a, %d %b %Y %H:%M:%S GMT'), - }) + headers, _ = httplib2.Http().request("http://github.com/foo", "GET") + + expect(dict(headers)).to.equal( + { + "content-type": "application/json", + "content-location": "http://github.com/foo", + "connection": "close", + "content-length": "27", + "status": "200", + "server": "Apache", + "date": now.strftime("%a, %d %b %Y %H:%M:%S GMT"), + } + ) @httprettified @@ -115,52 +126,59 @@ def test_httpretty_should_allow_adding_and_overwritting_httplib2(now): def test_httpretty_should_allow_forcing_headers_httplib2(now): "HTTPretty should allow forcing headers with httplib2" - HTTPretty.register_uri(HTTPretty.GET, "http://github.com/foo", - body="this is supposed to be the response", - forcing_headers={ - 'Content-Type': 'application/xml', - }) - - headers, _ = httplib2.Http().request('http://github.com/foo', 'GET') + HTTPretty.register_uri( + HTTPretty.GET, + "http://github.com/foo", + body="this is supposed to be the response", + forcing_headers={"Content-Type": "application/xml"}, + ) - expect(dict(headers)).to.equal({ - 'content-location': 'http://github.com/foo', # httplib2 FORCES - # content-location - # even if the - # server does not - # provide it - 'content-type': 'application/xml', - 'status': '200', # httplib2 also ALWAYS put status on headers - }) + headers, _ = httplib2.Http().request("http://github.com/foo", "GET") + + expect(dict(headers)).to.equal( + { + "content-location": "http://github.com/foo", # httplib2 FORCES + # content-location + # even if the + # server does not + # provide it + "content-type": "application/xml", + "status": "200", # httplib2 also ALWAYS put status on headers + } + ) @httprettified @within(two=microseconds) def test_httpretty_should_allow_adding_and_overwritting_by_kwargs_u2(now): - "HTTPretty should allow adding and overwritting headers by keyword args " \ - "with httplib2" - - HTTPretty.register_uri(HTTPretty.GET, "http://github.com/foo", - body="this is supposed to be the response", - server='Apache', - content_length='27', - content_type='application/json') - - headers, _ = httplib2.Http().request('http://github.com/foo', 'GET') - - expect(dict(headers)).to.equal({ - 'content-type': 'application/json', - 'content-location': 'http://github.com/foo', # httplib2 FORCES - # content-location - # even if the - # server does not - # provide it - 'connection': 'close', - 'content-length': '27', - 'status': '200', - 'server': 'Apache', - 'date': now.strftime('%a, %d %b %Y %H:%M:%S GMT'), - }) + "HTTPretty should allow adding and overwritting headers by keyword args " "with httplib2" + + HTTPretty.register_uri( + HTTPretty.GET, + "http://github.com/foo", + body="this is supposed to be the response", + server="Apache", + content_length="27", + content_type="application/json", + ) + + headers, _ = httplib2.Http().request("http://github.com/foo", "GET") + + expect(dict(headers)).to.equal( + { + "content-type": "application/json", + "content-location": "http://github.com/foo", # httplib2 FORCES + # content-location + # even if the + # server does not + # provide it + "connection": "close", + "content-length": "27", + "status": "200", + "server": "Apache", + "date": now.strftime("%a, %d %b %Y %H:%M:%S GMT"), + } + ) @httprettified @@ -169,29 +187,28 @@ def test_rotating_responses_with_httplib2(now): "HTTPretty should support rotating responses with httplib2" HTTPretty.register_uri( - HTTPretty.GET, "https://api.yahoo.com/test", + HTTPretty.GET, + "https://api.yahoo.com/test", responses=[ HTTPretty.Response(body="first response", status=201), - HTTPretty.Response(body='second and last response', status=202), - ]) + HTTPretty.Response(body="second and last response", status=202), + ], + ) - headers1, body1 = httplib2.Http().request( - 'https://api.yahoo.com/test', 'GET') + headers1, body1 = httplib2.Http().request("https://api.yahoo.com/test", "GET") - expect(headers1['status']).to.equal('201') - expect(body1).to.equal(b'first response') + expect(headers1["status"]).to.equal("201") + expect(body1).to.equal(b"first response") - headers2, body2 = httplib2.Http().request( - 'https://api.yahoo.com/test', 'GET') + headers2, body2 = httplib2.Http().request("https://api.yahoo.com/test", "GET") - expect(headers2['status']).to.equal('202') - expect(body2).to.equal(b'second and last response') + expect(headers2["status"]).to.equal("202") + expect(body2).to.equal(b"second and last response") - headers3, body3 = httplib2.Http().request( - 'https://api.yahoo.com/test', 'GET') + headers3, body3 = httplib2.Http().request("https://api.yahoo.com/test", "GET") - expect(headers3['status']).to.equal('202') - expect(body3).to.equal(b'second and last response') + expect(headers3["status"]).to.equal("202") + expect(body3).to.equal(b"second and last response") @httprettified @@ -199,24 +216,22 @@ def test_rotating_responses_with_httplib2(now): def test_can_inspect_last_request(now): "HTTPretty.last_request is a mimetools.Message request from last match" - HTTPretty.register_uri(HTTPretty.POST, "http://api.github.com/", - body='{"repositories": ["HTTPretty", "lettuce"]}') + HTTPretty.register_uri( + HTTPretty.POST, + "http://api.github.com/", + body='{"repositories": ["HTTPretty", "lettuce"]}', + ) headers, body = httplib2.Http().request( - 'http://api.github.com', 'POST', + "http://api.github.com", + "POST", body='{"username": "gabrielfalcao"}', - headers={ - 'content-type': 'text/json', - }, + headers={"content-type": "text/json"}, ) - expect(HTTPretty.last_request.method).to.equal('POST') - expect(HTTPretty.last_request.body).to.equal( - b'{"username": "gabrielfalcao"}', - ) - expect(HTTPretty.last_request.headers['content-type']).to.equal( - 'text/json', - ) + expect(HTTPretty.last_request.method).to.equal("POST") + expect(HTTPretty.last_request.body).to.equal(b'{"username": "gabrielfalcao"}') + expect(HTTPretty.last_request.headers["content-type"]).to.equal("text/json") expect(body).to.equal(b'{"repositories": ["HTTPretty", "lettuce"]}') @@ -225,24 +240,22 @@ def test_can_inspect_last_request(now): def test_can_inspect_last_request_with_ssl(now): "HTTPretty.last_request is recorded even when mocking 'https' (SSL)" - HTTPretty.register_uri(HTTPretty.POST, "https://secure.github.com/", - body='{"repositories": ["HTTPretty", "lettuce"]}') + HTTPretty.register_uri( + HTTPretty.POST, + "https://secure.github.com/", + body='{"repositories": ["HTTPretty", "lettuce"]}', + ) headers, body = httplib2.Http().request( - 'https://secure.github.com', 'POST', + "https://secure.github.com", + "POST", body='{"username": "gabrielfalcao"}', - headers={ - 'content-type': 'text/json', - }, + headers={"content-type": "text/json"}, ) - expect(HTTPretty.last_request.method).to.equal('POST') - expect(HTTPretty.last_request.body).to.equal( - b'{"username": "gabrielfalcao"}', - ) - expect(HTTPretty.last_request.headers['content-type']).to.equal( - 'text/json', - ) + expect(HTTPretty.last_request.method).to.equal("POST") + expect(HTTPretty.last_request.body).to.equal(b'{"username": "gabrielfalcao"}') + expect(HTTPretty.last_request.headers["content-type"]).to.equal("text/json") expect(body).to.equal(b'{"repositories": ["HTTPretty", "lettuce"]}') @@ -251,40 +264,42 @@ def test_can_inspect_last_request_with_ssl(now): def test_httpretty_ignores_querystrings_from_registered_uri(now): "Registering URIs with query string cause them to be ignored" - HTTPretty.register_uri(HTTPretty.GET, "http://yipit.com/?id=123", - body="Find the best daily deals") + HTTPretty.register_uri( + HTTPretty.GET, "http://yipit.com/?id=123", body="Find the best daily deals" + ) - _, got = httplib2.Http().request('http://yipit.com/?id=123', 'GET') + _, got = httplib2.Http().request("http://yipit.com/?id=123", "GET") - expect(got).to.equal(b'Find the best daily deals') - expect(HTTPretty.last_request.method).to.equal('GET') - expect(HTTPretty.last_request.path).to.equal('/?id=123') + expect(got).to.equal(b"Find the best daily deals") + expect(HTTPretty.last_request.method).to.equal("GET") + expect(HTTPretty.last_request.path).to.equal("/?id=123") @httprettified @within(two=microseconds) def test_callback_response(now): - ("HTTPretty should all a callback function to be set as the body with" - " httplib2") + ("HTTPretty should all a callback function to be set as the body with" " httplib2") def request_callback(request, uri, headers): - return [200, headers, "The {} response from {}".format(decode_utf8(request.method), uri)] + return [ + 200, + headers, + "The {} response from {}".format(decode_utf8(request.method), uri), + ] HTTPretty.register_uri( - HTTPretty.GET, "https://api.yahoo.com/test", - body=request_callback) + HTTPretty.GET, "https://api.yahoo.com/test", body=request_callback + ) - headers1, body1 = httplib2.Http().request( - 'https://api.yahoo.com/test', 'GET') + headers1, body1 = httplib2.Http().request("https://api.yahoo.com/test", "GET") expect(body1).to.equal(b"The GET response from https://api.yahoo.com/test") HTTPretty.register_uri( - HTTPretty.POST, "https://api.yahoo.com/test_post", - body=request_callback) + HTTPretty.POST, "https://api.yahoo.com/test_post", body=request_callback + ) - headers2, body2 = httplib2.Http().request( - 'https://api.yahoo.com/test_post', 'POST') + headers2, body2 = httplib2.Http().request("https://api.yahoo.com/test_post", "POST") expect(body2).to.equal(b"The POST response from https://api.yahoo.com/test_post") @@ -299,7 +314,9 @@ def test_httpretty_should_allow_registering_regexes(): body="Found brand", ) - response, body = httplib2.Http().request('https://api.yipit.com/v1/deal;brand=gap', 'GET') - expect(body).to.equal(b'Found brand') - expect(HTTPretty.last_request.method).to.equal('GET') - expect(HTTPretty.last_request.path).to.equal('/v1/deal;brand=gap') + response, body = httplib2.Http().request( + "https://api.yipit.com/v1/deal;brand=gap", "GET" + ) + expect(body).to.equal(b"Found brand") + expect(HTTPretty.last_request.method).to.equal("GET") + expect(HTTPretty.last_request.path).to.equal("/v1/deal;brand=gap") diff --git a/tests/functional/test_passthrough.py b/tests/functional/test_passthrough.py index f39cf2a..a37c09b 100644 --- a/tests/functional/test_passthrough.py +++ b/tests/functional/test_passthrough.py @@ -33,16 +33,16 @@ from httpretty import HTTPretty @skip def test_http_passthrough(): - url = 'http://httpbin.org/status/200' + url = "http://httpbin.org/status/200" response1 = requests.get(url) response1 = requests.get(url, stream=True) HTTPretty.enable() - HTTPretty.register_uri(HTTPretty.GET, 'http://google.com/', body="Not Google") + HTTPretty.register_uri(HTTPretty.GET, "http://google.com/", body="Not Google") - response2 = requests.get('http://google.com/') - expect(response2.content).to.equal(b'Not Google') + response2 = requests.get("http://google.com/") + expect(response2.content).to.equal(b"Not Google") response3 = requests.get(url, stream=True) (response3.content).should.equal(response1.content) @@ -55,15 +55,15 @@ def test_http_passthrough(): @skip def test_https_passthrough(): - url = 'https://raw.githubusercontent.com/gabrielfalcao/HTTPretty/master/COPYING' + url = "https://raw.githubusercontent.com/gabrielfalcao/HTTPretty/master/COPYING" response1 = requests.get(url, stream=True) HTTPretty.enable() - HTTPretty.register_uri(HTTPretty.GET, 'https://google.com/', body="Not Google") + HTTPretty.register_uri(HTTPretty.GET, "https://google.com/", body="Not Google") - response2 = requests.get('https://google.com/') - expect(response2.content).to.equal(b'Not Google') + response2 = requests.get("https://google.com/") + expect(response2.content).to.equal(b"Not Google") response3 = requests.get(url, stream=True) (response3.content).should.equal(response1.content) diff --git a/tests/functional/test_requests.py b/tests/functional/test_requests.py index 06bdc1e..e1a0a30 100644 --- a/tests/functional/test_requests.py +++ b/tests/functional/test_requests.py @@ -54,13 +54,14 @@ else: try: advance_iterator = next except NameError: + def advance_iterator(it): return it.next() next = advance_iterator -server_url = lambda path, port: "http://localhost:{}/{}".format(port, path.lstrip('/')) +server_url = lambda path, port: "http://localhost:{}/{}".format(port, path.lstrip("/")) @httprettified @@ -68,13 +69,14 @@ server_url = lambda path, port: "http://localhost:{}/{}".format(port, path.lstri def test_httpretty_should_mock_a_simple_get_with_requests_read(now): "HTTPretty should mock a simple GET with requests.get" - HTTPretty.register_uri(HTTPretty.GET, "http://yipit.com/", - body="Find the best daily deals") + HTTPretty.register_uri( + HTTPretty.GET, "http://yipit.com/", body="Find the best daily deals" + ) - response = requests.get('http://yipit.com') - expect(response.text).to.equal('Find the best daily deals') - expect(HTTPretty.last_request.method).to.equal('GET') - expect(HTTPretty.last_request.path).to.equal('/') + response = requests.get("http://yipit.com") + expect(response.text).to.equal("Find the best daily deals") + expect(HTTPretty.last_request.method).to.equal("GET") + expect(HTTPretty.last_request.path).to.equal("/") @httprettified @@ -82,14 +84,14 @@ def test_httpretty_should_mock_a_simple_get_with_requests_read(now): def test_httpretty_provides_easy_access_to_querystrings(now): "HTTPretty should provide an easy access to the querystring" - HTTPretty.register_uri(HTTPretty.GET, "http://yipit.com/", - body="Find the best daily deals") + HTTPretty.register_uri( + HTTPretty.GET, "http://yipit.com/", body="Find the best daily deals" + ) - requests.get('http://yipit.com/?foo=bar&foo=baz&chuck=norris') - expect(HTTPretty.last_request.querystring).to.equal({ - 'foo': ['bar', 'baz'], - 'chuck': ['norris'], - }) + requests.get("http://yipit.com/?foo=bar&foo=baz&chuck=norris") + expect(HTTPretty.last_request.querystring).to.equal( + {"foo": ["bar", "baz"], "chuck": ["norris"]} + ) @httprettified @@ -97,21 +99,26 @@ def test_httpretty_provides_easy_access_to_querystrings(now): def test_httpretty_should_mock_headers_requests(now): "HTTPretty should mock basic headers with requests" - HTTPretty.register_uri(HTTPretty.GET, "http://github.com/", - body="this is supposed to be the response", - status=201) + HTTPretty.register_uri( + HTTPretty.GET, + "http://github.com/", + body="this is supposed to be the response", + status=201, + ) - response = requests.get('http://github.com') + response = requests.get("http://github.com") expect(response.status_code).to.equal(201) - expect(dict(response.headers)).to.equal({ - 'content-type': 'text/plain; charset=utf-8', - 'connection': 'close', - 'content-length': '35', - 'status': '201', - 'server': 'Python/HTTPretty', - 'date': now.strftime('%a, %d %b %Y %H:%M:%S GMT'), - }) + expect(dict(response.headers)).to.equal( + { + "content-type": "text/plain; charset=utf-8", + "connection": "close", + "content-length": "35", + "status": "201", + "server": "Python/HTTPretty", + "date": now.strftime("%a, %d %b %Y %H:%M:%S GMT"), + } + ) @httprettified @@ -119,24 +126,29 @@ def test_httpretty_should_mock_headers_requests(now): def test_httpretty_should_allow_adding_and_overwritting_requests(now): "HTTPretty should allow adding and overwritting headers with requests" - HTTPretty.register_uri(HTTPretty.GET, "http://github.com/foo", - body="this is supposed to be the response", - adding_headers={ - 'Server': 'Apache', - 'Content-Length': '27', - 'Content-Type': 'application/json', - }) - - response = requests.get('http://github.com/foo') + HTTPretty.register_uri( + HTTPretty.GET, + "http://github.com/foo", + body="this is supposed to be the response", + adding_headers={ + "Server": "Apache", + "Content-Length": "27", + "Content-Type": "application/json", + }, + ) - expect(dict(response.headers)).to.equal({ - 'content-type': 'application/json', - 'connection': 'close', - 'content-length': '27', - 'status': '200', - 'server': 'Apache', - 'date': now.strftime('%a, %d %b %Y %H:%M:%S GMT'), - }) + response = requests.get("http://github.com/foo") + + expect(dict(response.headers)).to.equal( + { + "content-type": "application/json", + "connection": "close", + "content-length": "27", + "status": "200", + "server": "Apache", + "date": now.strftime("%a, %d %b %Y %H:%M:%S GMT"), + } + ) @httprettified @@ -144,43 +156,46 @@ def test_httpretty_should_allow_adding_and_overwritting_requests(now): def test_httpretty_should_allow_forcing_headers_requests(now): "HTTPretty should allow forcing headers with requests" - HTTPretty.register_uri(HTTPretty.GET, "http://github.com/foo", - body="<root><baz /</root>", - forcing_headers={ - 'Content-Type': 'application/xml', - 'Content-Length': '19', - }) + HTTPretty.register_uri( + HTTPretty.GET, + "http://github.com/foo", + body="<root><baz /</root>", + forcing_headers={"Content-Type": "application/xml", "Content-Length": "19"}, + ) - response = requests.get('http://github.com/foo') + response = requests.get("http://github.com/foo") - expect(dict(response.headers)).to.equal({ - 'content-type': 'application/xml', - 'content-length': '19', - }) + expect(dict(response.headers)).to.equal( + {"content-type": "application/xml", "content-length": "19"} + ) @httprettified @within(two=microseconds) def test_httpretty_should_allow_adding_and_overwritting_by_kwargs_u2(now): - "HTTPretty should allow adding and overwritting headers by keyword args " \ - "with requests" + "HTTPretty should allow adding and overwritting headers by keyword args " "with requests" - HTTPretty.register_uri(HTTPretty.GET, "http://github.com/foo", - body="this is supposed to be the response", - server='Apache', - content_length='27', - content_type='application/json') - - response = requests.get('http://github.com/foo') + HTTPretty.register_uri( + HTTPretty.GET, + "http://github.com/foo", + body="this is supposed to be the response", + server="Apache", + content_length="27", + content_type="application/json", + ) - expect(dict(response.headers)).to.equal({ - 'content-type': 'application/json', - 'connection': 'close', - 'content-length': '27', - 'status': '200', - 'server': 'Apache', - 'date': now.strftime('%a, %d %b %Y %H:%M:%S GMT'), - }) + response = requests.get("http://github.com/foo") + + expect(dict(response.headers)).to.equal( + { + "content-type": "application/json", + "connection": "close", + "content-length": "27", + "status": "200", + "server": "Apache", + "date": now.strftime("%a, %d %b %Y %H:%M:%S GMT"), + } + ) @httprettified @@ -189,29 +204,28 @@ def test_rotating_responses_with_requests(now): "HTTPretty should support rotating responses with requests" HTTPretty.register_uri( - HTTPretty.GET, "https://api.yahoo.com/test", + HTTPretty.GET, + "https://api.yahoo.com/test", responses=[ HTTPretty.Response(body=b"first response", status=201), - HTTPretty.Response(body=b'second and last response', status=202), - ]) + HTTPretty.Response(body=b"second and last response", status=202), + ], + ) - response1 = requests.get( - 'https://api.yahoo.com/test') + response1 = requests.get("https://api.yahoo.com/test") expect(response1.status_code).to.equal(201) - expect(response1.text).to.equal('first response') + expect(response1.text).to.equal("first response") - response2 = requests.get( - 'https://api.yahoo.com/test') + response2 = requests.get("https://api.yahoo.com/test") expect(response2.status_code).to.equal(202) - expect(response2.text).to.equal('second and last response') + expect(response2.text).to.equal("second and last response") - response3 = requests.get( - 'https://api.yahoo.com/test') + response3 = requests.get("https://api.yahoo.com/test") expect(response3.status_code).to.equal(202) - expect(response3.text).to.equal('second and last response') + expect(response3.text).to.equal("second and last response") @httprettified @@ -219,24 +233,21 @@ def test_rotating_responses_with_requests(now): def test_can_inspect_last_request(now): "HTTPretty.last_request is a mimetools.Message request from last match" - HTTPretty.register_uri(HTTPretty.POST, "http://api.github.com/", - body='{"repositories": ["HTTPretty", "lettuce"]}') + HTTPretty.register_uri( + HTTPretty.POST, + "http://api.github.com/", + body='{"repositories": ["HTTPretty", "lettuce"]}', + ) response = requests.post( - 'http://api.github.com', + "http://api.github.com", '{"username": "gabrielfalcao"}', - headers={ - 'content-type': 'text/json', - }, + headers={"content-type": "text/json"}, ) - expect(HTTPretty.last_request.method).to.equal('POST') - expect(HTTPretty.last_request.body).to.equal( - b'{"username": "gabrielfalcao"}', - ) - expect(HTTPretty.last_request.headers['content-type']).to.equal( - 'text/json', - ) + expect(HTTPretty.last_request.method).to.equal("POST") + expect(HTTPretty.last_request.body).to.equal(b'{"username": "gabrielfalcao"}') + expect(HTTPretty.last_request.headers["content-type"]).to.equal("text/json") expect(response.json()).to.equal({"repositories": ["HTTPretty", "lettuce"]}) @@ -245,24 +256,21 @@ def test_can_inspect_last_request(now): def test_can_inspect_last_request_with_ssl(now): "HTTPretty.last_request is recorded even when mocking 'https' (SSL)" - HTTPretty.register_uri(HTTPretty.POST, "https://secure.github.com/", - body='{"repositories": ["HTTPretty", "lettuce"]}') + HTTPretty.register_uri( + HTTPretty.POST, + "https://secure.github.com/", + body='{"repositories": ["HTTPretty", "lettuce"]}', + ) response = requests.post( - 'https://secure.github.com', + "https://secure.github.com", '{"username": "gabrielfalcao"}', - headers={ - 'content-type': 'text/json', - }, + headers={"content-type": "text/json"}, ) - expect(HTTPretty.last_request.method).to.equal('POST') - expect(HTTPretty.last_request.body).to.equal( - b'{"username": "gabrielfalcao"}', - ) - expect(HTTPretty.last_request.headers['content-type']).to.equal( - 'text/json', - ) + expect(HTTPretty.last_request.method).to.equal("POST") + expect(HTTPretty.last_request.body).to.equal(b'{"username": "gabrielfalcao"}') + expect(HTTPretty.last_request.headers["content-type"]).to.equal("text/json") expect(response.json()).to.equal({"repositories": ["HTTPretty", "lettuce"]}) @@ -271,13 +279,14 @@ def test_can_inspect_last_request_with_ssl(now): def test_httpretty_ignores_querystrings_from_registered_uri(now): "HTTPretty should ignore querystrings from the registered uri (requests library)" - HTTPretty.register_uri(HTTPretty.GET, "http://yipit.com/?id=123", - body=b"Find the best daily deals") + HTTPretty.register_uri( + HTTPretty.GET, "http://yipit.com/?id=123", body=b"Find the best daily deals" + ) - response = requests.get('http://yipit.com/', params={'id': 123}) - expect(response.text).to.equal('Find the best daily deals') - expect(HTTPretty.last_request.method).to.equal('GET') - expect(HTTPretty.last_request.path).to.equal('/?id=123') + response = requests.get("http://yipit.com/", params={"id": 123}) + expect(response.text).to.equal("Find the best daily deals") + expect(HTTPretty.last_request.method).to.equal("GET") + expect(HTTPretty.last_request.path).to.equal("/?id=123") @httprettified @@ -300,6 +309,7 @@ def test_streaming_responses(now): def handler(signum, frame): raise AssertionError(message) + signal.signal(signal.SIGALRM, handler) signal.setitimer(signal.ITIMER_REAL, time) yield @@ -308,253 +318,270 @@ def test_streaming_responses(now): # XXX this obviously isn't a fully functional twitter streaming client! twitter_response_lines = [ b'{"text":"If \\"for the boobs\\" requests to follow me one more time I\'m calling the police. http://t.co/a0mDEAD8"}\r\n', - b'\r\n', - b'{"text":"RT @onedirection: Thanks for all your # FollowMe1D requests Directioners! We\u2019ll be following 10 people throughout the day starting NOW. G ..."}\r\n' + b"\r\n", + b'{"text":"RT @onedirection: Thanks for all your # FollowMe1D requests Directioners! We\u2019ll be following 10 people throughout the day starting NOW. G ..."}\r\n', ] TWITTER_STREAMING_URL = "https://stream.twitter.com/1/statuses/filter.json" - HTTPretty.register_uri(HTTPretty.POST, TWITTER_STREAMING_URL, - body=(l for l in twitter_response_lines), - streaming=True) + HTTPretty.register_uri( + HTTPretty.POST, + TWITTER_STREAMING_URL, + body=(l for l in twitter_response_lines), + streaming=True, + ) # taken from the requests docs # Http://docs.python-requests.org/en/latest/user/advanced/# streaming-requests - response = requests.post(TWITTER_STREAMING_URL, data={'track': 'requests'}, - auth=('username', 'password'), stream=True) + response = requests.post( + TWITTER_STREAMING_URL, + data={"track": "requests"}, + auth=("username", "password"), + stream=True, + ) # test iterating by line line_iter = response.iter_lines() - with in_time(0.01, 'Iterating by line is taking forever!'): + with in_time(0.01, "Iterating by line is taking forever!"): for i in xrange(len(twitter_response_lines)): - expect(next(line_iter).strip()).to.equal( - twitter_response_lines[i].strip()) + expect(next(line_iter).strip()).to.equal(twitter_response_lines[i].strip()) # test iterating by line after a second request response = requests.post( TWITTER_STREAMING_URL, - data={ - 'track': 'requests' - }, - auth=('username', 'password'), + data={"track": "requests"}, + auth=("username", "password"), stream=True, ) line_iter = response.iter_lines() - with in_time(0.01, 'Iterating by line is taking forever the second time ' - 'around!'): + with in_time( + 0.01, "Iterating by line is taking forever the second time " "around!" + ): for i in xrange(len(twitter_response_lines)): - expect(next(line_iter).strip()).to.equal( - twitter_response_lines[i].strip()) + expect(next(line_iter).strip()).to.equal(twitter_response_lines[i].strip()) # test iterating by char response = requests.post( TWITTER_STREAMING_URL, - data={ - 'track': 'requests' - }, - auth=('username', 'password'), - stream=True + data={"track": "requests"}, + auth=("username", "password"), + stream=True, ) - twitter_expected_response_body = b''.join(twitter_response_lines) - with in_time(0.02, 'Iterating by char is taking forever!'): - twitter_body = b''.join(c for c in response.iter_content(chunk_size=1)) + twitter_expected_response_body = b"".join(twitter_response_lines) + with in_time(0.02, "Iterating by char is taking forever!"): + twitter_body = b"".join(c for c in response.iter_content(chunk_size=1)) expect(twitter_body).to.equal(twitter_expected_response_body) # test iterating by chunks larger than the stream - response = requests.post(TWITTER_STREAMING_URL, data={'track': 'requests'}, - auth=('username', 'password'), stream=True) + response = requests.post( + TWITTER_STREAMING_URL, + data={"track": "requests"}, + auth=("username", "password"), + stream=True, + ) - with in_time(0.02, 'Iterating by large chunks is taking forever!'): - twitter_body = b''.join(c for c in - response.iter_content(chunk_size=1024)) + with in_time(0.02, "Iterating by large chunks is taking forever!"): + twitter_body = b"".join(c for c in response.iter_content(chunk_size=1024)) expect(twitter_body).to.equal(twitter_expected_response_body) @httprettified def test_multiline(): - url = 'http://httpbin.org/post' - data = b'content=Im\r\na multiline\r\n\r\nsentence\r\n' + url = "http://httpbin.org/post" + data = b"content=Im\r\na multiline\r\n\r\nsentence\r\n" headers = { - 'Content-Type': 'application/x-www-form-urlencoded; charset=utf-8', - 'Accept': 'text/plain', + "Content-Type": "application/x-www-form-urlencoded; charset=utf-8", + "Accept": "text/plain", } - HTTPretty.register_uri( - HTTPretty.POST, - url, - ) + HTTPretty.register_uri(HTTPretty.POST, url) response = requests.post(url, data=data, headers=headers) expect(response.status_code).to.equal(200) - expect(HTTPretty.last_request.method).to.equal('POST') - expect(HTTPretty.last_request.path).to.equal('/post') + expect(HTTPretty.last_request.method).to.equal("POST") + expect(HTTPretty.last_request.path).to.equal("/post") expect(HTTPretty.last_request.body).to.equal(data) - expect(HTTPretty.last_request.headers['content-length']).to.equal('37') - expect(HTTPretty.last_request.headers['content-type']).to.equal('application/x-www-form-urlencoded; charset=utf-8') + expect(HTTPretty.last_request.headers["content-length"]).to.equal("37") + expect(HTTPretty.last_request.headers["content-type"]).to.equal( + "application/x-www-form-urlencoded; charset=utf-8" + ) expect(len(HTTPretty.latest_requests)).to.equal(1) @httprettified def test_octet_stream(): - url = 'http://httpbin.org/post' + url = "http://httpbin.org/post" data = b"\xf5\x00\x00\x00" # utf-8 with invalid start byte - headers = { - 'Content-Type': 'application/octet-stream', - } - HTTPretty.register_uri( - HTTPretty.POST, - url, - ) + headers = {"Content-Type": "application/octet-stream"} + HTTPretty.register_uri(HTTPretty.POST, url) response = requests.post(url, data=data, headers=headers) expect(response.status_code).to.equal(200) - expect(HTTPretty.last_request.method).to.equal('POST') - expect(HTTPretty.last_request.path).to.equal('/post') + expect(HTTPretty.last_request.method).to.equal("POST") + expect(HTTPretty.last_request.path).to.equal("/post") expect(HTTPretty.last_request.body).to.equal(data) - expect(HTTPretty.last_request.headers['content-length']).to.equal('4') - expect(HTTPretty.last_request.headers['content-type']).to.equal('application/octet-stream') + expect(HTTPretty.last_request.headers["content-length"]).to.equal("4") + expect(HTTPretty.last_request.headers["content-type"]).to.equal( + "application/octet-stream" + ) expect(len(HTTPretty.latest_requests)).to.equal(1) @httprettified def test_multipart(): - url = 'http://httpbin.org/post' + url = "http://httpbin.org/post" data = b'--xXXxXXyYYzzz\r\nContent-Disposition: form-data; name="content"\r\nContent-Type: text/plain; charset=utf-8\r\nContent-Length: 68\r\n\r\nAction: comment\nText: Comment with attach\nAttachment: x1.txt, x2.txt\r\n--xXXxXXyYYzzz\r\nContent-Disposition: form-data; name="attachment_2"; filename="x.txt"\r\nContent-Type: text/plain\r\nContent-Length: 4\r\n\r\nbye\n\r\n--xXXxXXyYYzzz\r\nContent-Disposition: form-data; name="attachment_1"; filename="x.txt"\r\nContent-Type: text/plain\r\nContent-Length: 4\r\n\r\nbye\n\r\n--xXXxXXyYYzzz--\r\n' - headers = {'Content-Length': '495', 'Content-Type': 'multipart/form-data; boundary=xXXxXXyYYzzz', 'Accept': 'text/plain'} - HTTPretty.register_uri( - HTTPretty.POST, - url, - ) + headers = { + "Content-Length": "495", + "Content-Type": "multipart/form-data; boundary=xXXxXXyYYzzz", + "Accept": "text/plain", + } + HTTPretty.register_uri(HTTPretty.POST, url) response = requests.post(url, data=data, headers=headers) expect(response.status_code).to.equal(200) - expect(HTTPretty.last_request.method).to.equal('POST') - expect(HTTPretty.last_request.path).to.equal('/post') + expect(HTTPretty.last_request.method).to.equal("POST") + expect(HTTPretty.last_request.path).to.equal("/post") expect(HTTPretty.last_request.body).to.equal(data) - expect(HTTPretty.last_request.headers['content-length']).to.equal('495') - expect(HTTPretty.last_request.headers['content-type']).to.equal('multipart/form-data; boundary=xXXxXXyYYzzz') + expect(HTTPretty.last_request.headers["content-length"]).to.equal("495") + expect(HTTPretty.last_request.headers["content-type"]).to.equal( + "multipart/form-data; boundary=xXXxXXyYYzzz" + ) expect(len(HTTPretty.latest_requests)).to.equal(1) @httprettified @within(two=microseconds) def test_callback_response(now): - ("HTTPretty should call a callback function and set its return value as the body of the response" - " requests") + ( + "HTTPretty should call a callback function and set its return value as the body of the response" + " requests" + ) def request_callback(request, uri, headers): - return [200, headers, "The {} response from {}".format(decode_utf8(request.method), uri)] + return [ + 200, + headers, + "The {} response from {}".format(decode_utf8(request.method), uri), + ] HTTPretty.register_uri( - HTTPretty.GET, "https://api.yahoo.com/test", - body=request_callback) + HTTPretty.GET, "https://api.yahoo.com/test", body=request_callback + ) - response = requests.get('https://api.yahoo.com/test') + response = requests.get("https://api.yahoo.com/test") expect(response.text).to.equal("The GET response from https://api.yahoo.com/test") HTTPretty.register_uri( - HTTPretty.POST, "https://api.yahoo.com/test_post", - body=request_callback) + HTTPretty.POST, "https://api.yahoo.com/test_post", body=request_callback + ) response = requests.post( - "https://api.yahoo.com/test_post", - {"username": "gabrielfalcao"} + "https://api.yahoo.com/test_post", {"username": "gabrielfalcao"} ) - expect(response.text).to.equal("The POST response from https://api.yahoo.com/test_post") + expect(response.text).to.equal( + "The POST response from https://api.yahoo.com/test_post" + ) @httprettified @within(two=microseconds) def test_callback_body_remains_callable_for_any_subsequent_requests(now): - ("HTTPretty should call a callback function more than one" - " requests") + ("HTTPretty should call a callback function more than one" " requests") def request_callback(request, uri, headers): - return [200, headers, "The {} response from {}".format(decode_utf8(request.method), uri)] + return [ + 200, + headers, + "The {} response from {}".format(decode_utf8(request.method), uri), + ] HTTPretty.register_uri( - HTTPretty.GET, "https://api.yahoo.com/test", - body=request_callback) + HTTPretty.GET, "https://api.yahoo.com/test", body=request_callback + ) - response = requests.get('https://api.yahoo.com/test') + response = requests.get("https://api.yahoo.com/test") expect(response.text).to.equal("The GET response from https://api.yahoo.com/test") - response = requests.get('https://api.yahoo.com/test') + response = requests.get("https://api.yahoo.com/test") expect(response.text).to.equal("The GET response from https://api.yahoo.com/test") @httprettified @within(two=microseconds) def test_callback_setting_headers_and_status_response(now): - ("HTTPretty should call a callback function and uses it retur tuple as status code, headers and body" - " requests") + ( + "HTTPretty should call a callback function and uses it retur tuple as status code, headers and body" + " requests" + ) def request_callback(request, uri, headers): - headers.update({'a': 'b'}) - return [418, headers, "The {} response from {}".format(decode_utf8(request.method), uri)] + headers.update({"a": "b"}) + return [ + 418, + headers, + "The {} response from {}".format(decode_utf8(request.method), uri), + ] HTTPretty.register_uri( - HTTPretty.GET, "https://api.yahoo.com/test", - body=request_callback) + HTTPretty.GET, "https://api.yahoo.com/test", body=request_callback + ) - response = requests.get('https://api.yahoo.com/test') + response = requests.get("https://api.yahoo.com/test") expect(response.text).to.equal("The GET response from https://api.yahoo.com/test") - expect(response.headers).to.have.key('a').being.equal("b") + expect(response.headers).to.have.key("a").being.equal("b") expect(response.status_code).to.equal(418) HTTPretty.register_uri( - HTTPretty.POST, "https://api.yahoo.com/test_post", - body=request_callback) + HTTPretty.POST, "https://api.yahoo.com/test_post", body=request_callback + ) response = requests.post( - "https://api.yahoo.com/test_post", - {"username": "gabrielfalcao"} + "https://api.yahoo.com/test_post", {"username": "gabrielfalcao"} ) - expect(response.text).to.equal("The POST response from https://api.yahoo.com/test_post") - expect(response.headers).to.have.key('a').being.equal("b") + expect(response.text).to.equal( + "The POST response from https://api.yahoo.com/test_post" + ) + expect(response.headers).to.have.key("a").being.equal("b") expect(response.status_code).to.equal(418) @httprettified def test_httpretty_should_respect_matcher_priority(): HTTPretty.register_uri( - HTTPretty.GET, - re.compile(r".*"), - body='high priority', - priority=5, + HTTPretty.GET, re.compile(r".*"), body="high priority", priority=5 ) HTTPretty.register_uri( - HTTPretty.GET, - re.compile(r".+"), - body='low priority', - priority=0, + HTTPretty.GET, re.compile(r".+"), body="low priority", priority=0 ) - response = requests.get('http://api.yipit.com/v1/') - expect(response.text).to.equal('high priority') + response = requests.get("http://api.yipit.com/v1/") + expect(response.text).to.equal("high priority") @httprettified @within(two=microseconds) def test_callback_setting_content_length_on_head(now): - ("HTTPretty should call a callback function, use it's return tuple as status code, headers and body" - " requests and respect the content-length header when responding to HEAD") + ( + "HTTPretty should call a callback function, use it's return tuple as status code, headers and body" + " requests and respect the content-length header when responding to HEAD" + ) def request_callback(request, uri, headers): - headers.update({'content-length': 12345}) + headers.update({"content-length": 12345}) return [200, headers, ""] HTTPretty.register_uri( - HTTPretty.HEAD, "https://api.yahoo.com/test", - body=request_callback) + HTTPretty.HEAD, "https://api.yahoo.com/test", body=request_callback + ) - response = requests.head('https://api.yahoo.com/test') - expect(response.headers).to.have.key('content-length').being.equal("12345") + response = requests.head("https://api.yahoo.com/test") + expect(response.headers).to.have.key("content-length").being.equal("12345") expect(response.status_code).to.equal(200) @@ -565,14 +592,20 @@ def test_httpretty_should_allow_registering_regexes_and_give_a_proper_match_to_t HTTPretty.register_uri( HTTPretty.GET, re.compile(r"https://api.yipit.com/v1/deal;brand=(?P<brand_name>\w+)"), - body=lambda method, uri, headers: [200, headers, uri] + body=lambda method, uri, headers: [200, headers, uri], ) - response = requests.get('https://api.yipit.com/v1/deal;brand=gap?first_name=chuck&last_name=norris') + response = requests.get( + "https://api.yipit.com/v1/deal;brand=gap?first_name=chuck&last_name=norris" + ) - expect(response.text).to.equal('https://api.yipit.com/v1/deal;brand=gap?first_name=chuck&last_name=norris') - expect(HTTPretty.last_request.method).to.equal('GET') - expect(HTTPretty.last_request.path).to.equal('/v1/deal;brand=gap?first_name=chuck&last_name=norris') + expect(response.text).to.equal( + "https://api.yipit.com/v1/deal;brand=gap?first_name=chuck&last_name=norris" + ) + expect(HTTPretty.last_request.method).to.equal("GET") + expect(HTTPretty.last_request.path).to.equal( + "/v1/deal;brand=gap?first_name=chuck&last_name=norris" + ) @httprettified @@ -585,11 +618,14 @@ def test_httpretty_should_allow_registering_regexes(): body="Found brand", ) - response = requests.get('https://api.yipit.com/v1/deal;brand=gap?first_name=chuck&last_name=norris' - ) - expect(response.text).to.equal('Found brand') - expect(HTTPretty.last_request.method).to.equal('GET') - expect(HTTPretty.last_request.path).to.equal('/v1/deal;brand=gap?first_name=chuck&last_name=norris') + response = requests.get( + "https://api.yipit.com/v1/deal;brand=gap?first_name=chuck&last_name=norris" + ) + expect(response.text).to.equal("Found brand") + expect(HTTPretty.last_request.method).to.equal("GET") + expect(HTTPretty.last_request.path).to.equal( + "/v1/deal;brand=gap?first_name=chuck&last_name=norris" + ) @httprettified @@ -599,15 +635,16 @@ def test_httpretty_provides_easy_access_to_querystrings_with_regexes(): HTTPretty.register_uri( HTTPretty.GET, re.compile(r"https://api.yipit.com/v1/(?P<endpoint>\w+)/$"), - body="Find the best daily deals" + body="Find the best daily deals", ) - response = requests.get('https://api.yipit.com/v1/deals/?foo=bar&foo=baz&chuck=norris') + response = requests.get( + "https://api.yipit.com/v1/deals/?foo=bar&foo=baz&chuck=norris" + ) expect(response.text).to.equal("Find the best daily deals") - expect(HTTPretty.last_request.querystring).to.equal({ - 'foo': ['bar', 'baz'], - 'chuck': ['norris'], - }) + expect(HTTPretty.last_request.querystring).to.equal( + {"foo": ["bar", "baz"], "chuck": ["norris"]} + ) @httprettified @@ -619,28 +656,24 @@ def test_httpretty_allows_to_chose_if_querystring_should_be_matched(): HTTPretty.GET, re.compile(r"https://example.org/(?P<endpoint>\w+)/$"), body="Nudge, nudge, wink, wink. Know what I mean?", - match_querystring=True + match_querystring=True, ) - response = requests.get('https://example.org/what/') - expect(response.text).to.equal('Nudge, nudge, wink, wink. Know what I mean?') + response = requests.get("https://example.org/what/") + expect(response.text).to.equal("Nudge, nudge, wink, wink. Know what I mean?") - response = requests.get('https://example.org/what/?flying=coconuts') - expect(response.text).to.not_be.equal('Nudge, nudge, wink, wink. Know what I mean?') + response = requests.get("https://example.org/what/?flying=coconuts") + expect(response.text).to.not_be.equal("Nudge, nudge, wink, wink. Know what I mean?") @httprettified def test_httpretty_should_allow_multiple_methods_for_the_same_uri(): "HTTPretty should allow registering multiple methods for the same uri" - url = 'http://test.com/test' - methods = ['GET', 'POST', 'PUT', 'OPTIONS'] + url = "http://test.com/test" + methods = ["GET", "POST", "PUT", "OPTIONS"] for method in methods: - HTTPretty.register_uri( - getattr(HTTPretty, method), - url, - method - ) + HTTPretty.register_uri(getattr(HTTPretty, method), url, method) for method in methods: request_action = getattr(requests, method.lower()) @@ -651,10 +684,11 @@ def test_httpretty_should_allow_multiple_methods_for_the_same_uri(): def test_httpretty_should_allow_registering_regexes_with_streaming_responses(): "HTTPretty should allow registering regexes with streaming responses" import os - os.environ['DEBUG'] = 'true' + + os.environ["DEBUG"] = "true" def my_callback(request, url, headers): - request.body.should.equal(b'hithere') + request.body.should.equal(b"hithere") return 200, headers, "Received" HTTPretty.register_uri( @@ -664,52 +698,50 @@ def test_httpretty_should_allow_registering_regexes_with_streaming_responses(): ) def gen(): - yield b'hi' - yield b'there' + yield b"hi" + yield b"there" response = requests.post( - 'https://api.yipit.com/v1/deal;brand=gap?first_name=chuck&last_name=norris', + "https://api.yipit.com/v1/deal;brand=gap?first_name=chuck&last_name=norris", data=gen(), ) expect(response.content).to.equal(b"Received") - expect(HTTPretty.last_request.method).to.equal('POST') - expect(HTTPretty.last_request.path).to.equal('/v1/deal;brand=gap?first_name=chuck&last_name=norris') + expect(HTTPretty.last_request.method).to.equal("POST") + expect(HTTPretty.last_request.path).to.equal( + "/v1/deal;brand=gap?first_name=chuck&last_name=norris" + ) @httprettified def test_httpretty_should_allow_multiple_responses_with_multiple_methods(): "HTTPretty should allow multiple responses when binding multiple methods to the same uri" - url = 'http://test.com/list' + url = "http://test.com/list" # add get responses HTTPretty.register_uri( - HTTPretty.GET, url, - responses=[ - HTTPretty.Response(body='a'), - HTTPretty.Response(body='b'), - ] + HTTPretty.GET, + url, + responses=[HTTPretty.Response(body="a"), HTTPretty.Response(body="b")], ) # add post responses HTTPretty.register_uri( - HTTPretty.POST, url, - responses=[ - HTTPretty.Response(body='c'), - HTTPretty.Response(body='d'), - ] + HTTPretty.POST, + url, + responses=[HTTPretty.Response(body="c"), HTTPretty.Response(body="d")], ) - expect(requests.get(url).text).to.equal('a') - expect(requests.post(url).text).to.equal('c') + expect(requests.get(url).text).to.equal("a") + expect(requests.post(url).text).to.equal("c") - expect(requests.get(url).text).to.equal('b') - expect(requests.get(url).text).to.equal('b') - expect(requests.get(url).text).to.equal('b') + expect(requests.get(url).text).to.equal("b") + expect(requests.get(url).text).to.equal("b") + expect(requests.get(url).text).to.equal("b") - expect(requests.post(url).text).to.equal('d') - expect(requests.post(url).text).to.equal('d') - expect(requests.post(url).text).to.equal('d') + expect(requests.post(url).text).to.equal("d") + expect(requests.post(url).text).to.equal("d") + expect(requests.post(url).text).to.equal("d") @httprettified @@ -717,19 +749,18 @@ def test_httpretty_should_normalize_url_patching(): "HTTPretty should normalize all url patching" HTTPretty.register_uri( - HTTPretty.GET, - "http://yipit.com/foo(bar)", - body="Find the best daily deals") + HTTPretty.GET, "http://yipit.com/foo(bar)", body="Find the best daily deals" + ) - response = requests.get('http://yipit.com/foo%28bar%29') - expect(response.text).to.equal('Find the best daily deals') + response = requests.get("http://yipit.com/foo%28bar%29") + expect(response.text).to.equal("Find the best daily deals") @httprettified def test_lack_of_trailing_slash(): ("HTTPretty should automatically append a slash to given urls") - url = 'http://www.youtube.com' - HTTPretty.register_uri(HTTPretty.GET, url, body='') + url = "http://www.youtube.com" + HTTPretty.register_uri(HTTPretty.GET, url, body="") response = requests.get(url) response.status_code.should.equal(200) @@ -737,10 +768,13 @@ def test_lack_of_trailing_slash(): @httprettified def test_unicode_querystrings(): ("Querystrings should accept unicode characters") - HTTPretty.register_uri(HTTPretty.GET, "http://yipit.com/login", - body="Find the best daily deals") - requests.get('http://yipit.com/login?user=Gabriel+Falcão') - expect(HTTPretty.last_request.querystring['user'][0]).should.be.equal('Gabriel Falcão') + HTTPretty.register_uri( + HTTPretty.GET, "http://yipit.com/login", body="Find the best daily deals" + ) + requests.get("http://yipit.com/login?user=Gabriel+Falcão") + expect(HTTPretty.last_request.querystring["user"][0]).should.be.equal( + "Gabriel Falcão" + ) @use_tornado_server @@ -752,9 +786,11 @@ def test_recording_calls(port): # When I record some calls with HTTPretty.record(destination): requests.get(server_url("/foobar?name=Gabriel&age=25", port)) - requests.post(server_url("/foobar", port), - data=json.dumps({'test': '123'}), - headers={"Test": "foobar"}) + requests.post( + server_url("/foobar", port), + data=json.dumps({"test": "123"}), + headers={"Test": "foobar"}, + ) # Then the destination path should exist os.path.exists(destination).should.be.true @@ -773,25 +809,24 @@ def test_recording_calls(port): response.should.have.key("request").being.length_of(5) response.should.have.key("response").being.length_of(3) - response['request'].should.have.key("method").being.equal("GET") - response['request'].should.have.key("headers").being.a(dict) - response['request'].should.have.key("querystring").being.equal({ - "age": [ - "25" - ], - "name": [ - "Gabriel" - ] - }) - response['response'].should.have.key("status").being.equal(200) - response['response'].should.have.key("body").being.an(text_type) - response['response'].should.have.key("headers").being.a(dict) + response["request"].should.have.key("method").being.equal("GET") + response["request"].should.have.key("headers").being.a(dict) + response["request"].should.have.key("querystring").being.equal( + {"age": ["25"], "name": ["Gabriel"]} + ) + response["response"].should.have.key("status").being.equal(200) + response["response"].should.have.key("body").being.an(text_type) + response["response"].should.have.key("headers").being.a(dict) # older urllib3 had a bug where header keys were lower-cased: # https://github.com/shazow/urllib3/issues/236 # cope with that - if 'server' in response['response']["headers"]: - response['response']["headers"]["Server"] = response['response']["headers"].pop("server") - response['response']["headers"].should.have.key("Server").being.equal("TornadoServer/" + tornado_version) + if "server" in response["response"]["headers"]: + response["response"]["headers"]["Server"] = response["response"]["headers"].pop( + "server" + ) + response["response"]["headers"].should.have.key("Server").being.equal( + "TornadoServer/" + tornado_version + ) # And When I playback the previously recorded calls with HTTPretty.playback(destination): @@ -799,7 +834,7 @@ def test_recording_calls(port): response1 = requests.get(server_url("/foobar?name=Gabriel&age=25", port)) response2 = requests.post( server_url("/foobar", port), - data=json.dumps({'test': '123'}), + data=json.dumps({"test": "123"}), headers={"Test": "foobar"}, ) @@ -813,26 +848,29 @@ def test_recording_calls(port): @httprettified def test_py26_callback_response(): - ("HTTPretty should call a callback function *once* and set its return value" - " as the body of the response requests") + ( + "HTTPretty should call a callback function *once* and set its return value" + " as the body of the response requests" + ) from mock import Mock def _request_callback(request, uri, headers): - return [200, headers, "The {} response from {}".format(decode_utf8(request.method), uri)] + return [ + 200, + headers, + "The {} response from {}".format(decode_utf8(request.method), uri), + ] request_callback = Mock() request_callback.side_effect = _request_callback HTTPretty.register_uri( - HTTPretty.POST, "https://api.yahoo.com/test_post", - body=request_callback) - - requests.post( - "https://api.yahoo.com/test_post", - {"username": "gabrielfalcao"} + HTTPretty.POST, "https://api.yahoo.com/test_post", body=request_callback ) - os.environ['STOP'] = 'true' + + requests.post("https://api.yahoo.com/test_post", {"username": "gabrielfalcao"}) + os.environ["STOP"] = "true" expect(request_callback.call_count).equal(1) @@ -843,25 +881,31 @@ def test_httpretty_should_work_with_non_standard_ports(): HTTPretty.register_uri( HTTPretty.GET, re.compile(r"https://api.yipit.com:1234/v1/deal;brand=(?P<brand_name>\w+)"), - body=lambda method, uri, headers: [200, headers, uri] + body=lambda method, uri, headers: [200, headers, uri], ) HTTPretty.register_uri( HTTPretty.POST, "https://asdf.com:666/meow", - body=lambda method, uri, headers: [200, headers, uri] + body=lambda method, uri, headers: [200, headers, uri], ) - response = requests.get('https://api.yipit.com:1234/v1/deal;brand=gap?first_name=chuck&last_name=norris') + response = requests.get( + "https://api.yipit.com:1234/v1/deal;brand=gap?first_name=chuck&last_name=norris" + ) - expect(response.text).to.equal('https://api.yipit.com:1234/v1/deal;brand=gap?first_name=chuck&last_name=norris') - expect(HTTPretty.last_request.method).to.equal('GET') - expect(HTTPretty.last_request.path).to.equal('/v1/deal;brand=gap?first_name=chuck&last_name=norris') + expect(response.text).to.equal( + "https://api.yipit.com:1234/v1/deal;brand=gap?first_name=chuck&last_name=norris" + ) + expect(HTTPretty.last_request.method).to.equal("GET") + expect(HTTPretty.last_request.path).to.equal( + "/v1/deal;brand=gap?first_name=chuck&last_name=norris" + ) - response = requests.post('https://asdf.com:666/meow') + response = requests.post("https://asdf.com:666/meow") - expect(response.text).to.equal('https://asdf.com:666/meow') - expect(HTTPretty.last_request.method).to.equal('POST') - expect(HTTPretty.last_request.path).to.equal('/meow') + expect(response.text).to.equal("https://asdf.com:666/meow") + expect(HTTPretty.last_request.method).to.equal("POST") + expect(HTTPretty.last_request.path).to.equal("/meow") @httprettified @@ -871,28 +915,28 @@ def test_httpretty_reset_by_switching_protocols_for_same_port(): HTTPretty.register_uri( HTTPretty.GET, "http://api.yipit.com:1234/v1/deal", - body=lambda method, uri, headers: [200, headers, uri] + body=lambda method, uri, headers: [200, headers, uri], ) - response = requests.get('http://api.yipit.com:1234/v1/deal') + response = requests.get("http://api.yipit.com:1234/v1/deal") - expect(response.text).to.equal('http://api.yipit.com:1234/v1/deal') - expect(HTTPretty.last_request.method).to.equal('GET') - expect(HTTPretty.last_request.path).to.equal('/v1/deal') + expect(response.text).to.equal("http://api.yipit.com:1234/v1/deal") + expect(HTTPretty.last_request.method).to.equal("GET") + expect(HTTPretty.last_request.path).to.equal("/v1/deal") HTTPretty.reset() HTTPretty.register_uri( HTTPretty.GET, "https://api.yipit.com:1234/v1/deal", - body=lambda method, uri, headers: [200, headers, uri] + body=lambda method, uri, headers: [200, headers, uri], ) - response = requests.get('https://api.yipit.com:1234/v1/deal') + response = requests.get("https://api.yipit.com:1234/v1/deal") - expect(response.text).to.equal('https://api.yipit.com:1234/v1/deal') - expect(HTTPretty.last_request.method).to.equal('GET') - expect(HTTPretty.last_request.path).to.equal('/v1/deal') + expect(response.text).to.equal("https://api.yipit.com:1234/v1/deal") + expect(HTTPretty.last_request.method).to.equal("GET") + expect(HTTPretty.last_request.path).to.equal("/v1/deal") @httprettified @@ -902,11 +946,17 @@ def test_httpretty_should_allow_registering_regexes_with_port_and_give_a_proper_ HTTPretty.register_uri( HTTPretty.GET, re.compile(r"https://api.yipit.com:1234/v1/deal;brand=(?P<brand_name>\w+)"), - body=lambda method, uri, headers: [200, headers, uri] + body=lambda method, uri, headers: [200, headers, uri], ) - response = requests.get('https://api.yipit.com:1234/v1/deal;brand=gap?first_name=chuck&last_name=norris') + response = requests.get( + "https://api.yipit.com:1234/v1/deal;brand=gap?first_name=chuck&last_name=norris" + ) - expect(response.text).to.equal('https://api.yipit.com:1234/v1/deal;brand=gap?first_name=chuck&last_name=norris') - expect(HTTPretty.last_request.method).to.equal('GET') - expect(HTTPretty.last_request.path).to.equal('/v1/deal;brand=gap?first_name=chuck&last_name=norris') + expect(response.text).to.equal( + "https://api.yipit.com:1234/v1/deal;brand=gap?first_name=chuck&last_name=norris" + ) + expect(HTTPretty.last_request.method).to.equal("GET") + expect(HTTPretty.last_request.path).to.equal( + "/v1/deal;brand=gap?first_name=chuck&last_name=norris" + ) diff --git a/tests/functional/test_urllib2.py b/tests/functional/test_urllib2.py index c2a1732..a101487 100644 --- a/tests/functional/test_urllib2.py +++ b/tests/functional/test_urllib2.py @@ -27,11 +27,13 @@ from __future__ import unicode_literals import re + try: from urllib.request import urlopen import urllib.request as urllib2 except ImportError: import urllib2 + urlopen = urllib2.urlopen from sure import within, microseconds @@ -44,14 +46,15 @@ from httpretty.core import decode_utf8 def test_httpretty_should_mock_a_simple_get_with_urllib2_read(): "HTTPretty should mock a simple GET with urllib2.read()" - HTTPretty.register_uri(HTTPretty.GET, "http://yipit.com/", - body="Find the best daily deals") + HTTPretty.register_uri( + HTTPretty.GET, "http://yipit.com/", body="Find the best daily deals" + ) - fd = urlopen('http://yipit.com') + fd = urlopen("http://yipit.com") got = fd.read() fd.close() - got.should.equal(b'Find the best daily deals') + got.should.equal(b"Find the best daily deals") @httprettified @@ -59,17 +62,17 @@ def test_httpretty_should_mock_a_simple_get_with_urllib2_read(): def test_httpretty_provides_easy_access_to_querystrings(now): "HTTPretty should provide an easy access to the querystring" - HTTPretty.register_uri(HTTPretty.GET, "http://yipit.com/", - body="Find the best daily deals") + HTTPretty.register_uri( + HTTPretty.GET, "http://yipit.com/", body="Find the best daily deals" + ) - fd = urllib2.urlopen('http://yipit.com/?foo=bar&foo=baz&chuck=norris') + fd = urllib2.urlopen("http://yipit.com/?foo=bar&foo=baz&chuck=norris") fd.read() fd.close() - HTTPretty.last_request.querystring.should.equal({ - 'foo': ['bar', 'baz'], - 'chuck': ['norris'], - }) + HTTPretty.last_request.querystring.should.equal( + {"foo": ["bar", "baz"], "chuck": ["norris"]} + ) @httprettified @@ -77,24 +80,29 @@ def test_httpretty_provides_easy_access_to_querystrings(now): def test_httpretty_should_mock_headers_urllib2(now): "HTTPretty should mock basic headers with urllib2" - HTTPretty.register_uri(HTTPretty.GET, "http://github.com/", - body="this is supposed to be the response", - status=201) + HTTPretty.register_uri( + HTTPretty.GET, + "http://github.com/", + body="this is supposed to be the response", + status=201, + ) - request = urlopen('http://github.com') + request = urlopen("http://github.com") headers = dict(request.headers) request.close() request.code.should.equal(201) - headers.should.equal({ - 'content-type': 'text/plain; charset=utf-8', - 'connection': 'close', - 'content-length': '35', - 'status': '201', - 'server': 'Python/HTTPretty', - 'date': now.strftime('%a, %d %b %Y %H:%M:%S GMT'), - }) + headers.should.equal( + { + "content-type": "text/plain; charset=utf-8", + "connection": "close", + "content-length": "35", + "status": "201", + "server": "Python/HTTPretty", + "date": now.strftime("%a, %d %b %Y %H:%M:%S GMT"), + } + ) @httprettified @@ -102,27 +110,32 @@ def test_httpretty_should_mock_headers_urllib2(now): def test_httpretty_should_allow_adding_and_overwritting_urllib2(now): "HTTPretty should allow adding and overwritting headers with urllib2" - HTTPretty.register_uri(HTTPretty.GET, "http://github.com/", - body="this is supposed to be the response", - adding_headers={ - 'Server': 'Apache', - 'Content-Length': '27', - 'Content-Type': 'application/json', - }) + HTTPretty.register_uri( + HTTPretty.GET, + "http://github.com/", + body="this is supposed to be the response", + adding_headers={ + "Server": "Apache", + "Content-Length": "27", + "Content-Type": "application/json", + }, + ) - request = urlopen('http://github.com') + request = urlopen("http://github.com") headers = dict(request.headers) request.close() request.code.should.equal(200) - headers.should.equal({ - 'content-type': 'application/json', - 'connection': 'close', - 'content-length': '27', - 'status': '200', - 'server': 'Apache', - 'date': now.strftime('%a, %d %b %Y %H:%M:%S GMT'), - }) + headers.should.equal( + { + "content-type": "application/json", + "connection": "close", + "content-length": "27", + "status": "200", + "server": "Apache", + "date": now.strftime("%a, %d %b %Y %H:%M:%S GMT"), + } + ) @httprettified @@ -130,82 +143,87 @@ def test_httpretty_should_allow_adding_and_overwritting_urllib2(now): def test_httpretty_should_allow_forcing_headers_urllib2(): "HTTPretty should allow forcing headers with urllib2" - HTTPretty.register_uri(HTTPretty.GET, "http://github.com/", - body="this is supposed to be the response", - forcing_headers={ - 'Content-Type': 'application/xml', - 'Content-Length': '35a', - }) + HTTPretty.register_uri( + HTTPretty.GET, + "http://github.com/", + body="this is supposed to be the response", + forcing_headers={"Content-Type": "application/xml", "Content-Length": "35a"}, + ) - request = urlopen('http://github.com') + request = urlopen("http://github.com") headers = dict(request.headers) request.close() - headers.should.equal({ - 'content-type': 'application/xml', - 'content-length': '35a', - }) + headers.should.equal({"content-type": "application/xml", "content-length": "35a"}) @httprettified @within(two=microseconds) def test_httpretty_should_allow_adding_and_overwritting_by_kwargs_u2(now): - ("HTTPretty should allow adding and overwritting headers by " - "keyword args with urllib2") + ( + "HTTPretty should allow adding and overwritting headers by " + "keyword args with urllib2" + ) body = "this is supposed to be the response, indeed" - HTTPretty.register_uri(HTTPretty.GET, "http://github.com/", - body=body, - server='Apache', - content_length=len(body), - content_type='application/json') + HTTPretty.register_uri( + HTTPretty.GET, + "http://github.com/", + body=body, + server="Apache", + content_length=len(body), + content_type="application/json", + ) - request = urlopen('http://github.com') + request = urlopen("http://github.com") headers = dict(request.headers) request.close() request.code.should.equal(200) - headers.should.equal({ - 'content-type': 'application/json', - 'connection': 'close', - 'content-length': str(len(body)), - 'status': '200', - 'server': 'Apache', - 'date': now.strftime('%a, %d %b %Y %H:%M:%S GMT'), - }) + headers.should.equal( + { + "content-type": "application/json", + "connection": "close", + "content-length": str(len(body)), + "status": "200", + "server": "Apache", + "date": now.strftime("%a, %d %b %Y %H:%M:%S GMT"), + } + ) @httprettified @within(two=microseconds) def test_httpretty_should_support_a_list_of_successive_responses_urllib2(now): - ("HTTPretty should support adding a list of successive " - "responses with urllib2") + ("HTTPretty should support adding a list of successive " "responses with urllib2") HTTPretty.register_uri( - HTTPretty.GET, "https://api.yahoo.com/test", + HTTPretty.GET, + "https://api.yahoo.com/test", responses=[ HTTPretty.Response(body="first response", status=201), - HTTPretty.Response(body='second and last response', status=202), - ]) + HTTPretty.Response(body="second and last response", status=202), + ], + ) - request1 = urlopen('https://api.yahoo.com/test') + request1 = urlopen("https://api.yahoo.com/test") body1 = request1.read() request1.close() request1.code.should.equal(201) - body1.should.equal(b'first response') + body1.should.equal(b"first response") - request2 = urlopen('https://api.yahoo.com/test') + request2 = urlopen("https://api.yahoo.com/test") body2 = request2.read() request2.close() request2.code.should.equal(202) - body2.should.equal(b'second and last response') + body2.should.equal(b"second and last response") - request3 = urlopen('https://api.yahoo.com/test') + request3 = urlopen("https://api.yahoo.com/test") body3 = request3.read() request3.close() request3.code.should.equal(202) - body3.should.equal(b'second and last response') + body3.should.equal(b"second and last response") @httprettified @@ -213,27 +231,24 @@ def test_httpretty_should_support_a_list_of_successive_responses_urllib2(now): def test_can_inspect_last_request(now): "HTTPretty.last_request is a mimetools.Message request from last match" - HTTPretty.register_uri(HTTPretty.POST, "http://api.github.com/", - body='{"repositories": ["HTTPretty", "lettuce"]}') + HTTPretty.register_uri( + HTTPretty.POST, + "http://api.github.com/", + body='{"repositories": ["HTTPretty", "lettuce"]}', + ) request = urllib2.Request( - 'http://api.github.com', + "http://api.github.com", b'{"username": "gabrielfalcao"}', - { - 'content-type': 'text/json', - }, + {"content-type": "text/json"}, ) fd = urlopen(request) got = fd.read() fd.close() - HTTPretty.last_request.method.should.equal('POST') - HTTPretty.last_request.body.should.equal( - b'{"username": "gabrielfalcao"}', - ) - HTTPretty.last_request.headers['content-type'].should.equal( - 'text/json', - ) + HTTPretty.last_request.method.should.equal("POST") + HTTPretty.last_request.body.should.equal(b'{"username": "gabrielfalcao"}') + HTTPretty.last_request.headers["content-type"].should.equal("text/json") got.should.equal(b'{"repositories": ["HTTPretty", "lettuce"]}') @@ -242,27 +257,24 @@ def test_can_inspect_last_request(now): def test_can_inspect_last_request_with_ssl(now): "HTTPretty.last_request is recorded even when mocking 'https' (SSL)" - HTTPretty.register_uri(HTTPretty.POST, "https://secure.github.com/", - body='{"repositories": ["HTTPretty", "lettuce"]}') + HTTPretty.register_uri( + HTTPretty.POST, + "https://secure.github.com/", + body='{"repositories": ["HTTPretty", "lettuce"]}', + ) request = urllib2.Request( - 'https://secure.github.com', + "https://secure.github.com", b'{"username": "gabrielfalcao"}', - { - 'content-type': 'text/json', - }, + {"content-type": "text/json"}, ) fd = urlopen(request) got = fd.read() fd.close() - HTTPretty.last_request.method.should.equal('POST') - HTTPretty.last_request.body.should.equal( - b'{"username": "gabrielfalcao"}', - ) - HTTPretty.last_request.headers['content-type'].should.equal( - 'text/json', - ) + HTTPretty.last_request.method.should.equal("POST") + HTTPretty.last_request.body.should.equal(b'{"username": "gabrielfalcao"}') + HTTPretty.last_request.headers["content-type"].should.equal("text/json") got.should.equal(b'{"repositories": ["HTTPretty", "lettuce"]}') @@ -271,47 +283,49 @@ def test_can_inspect_last_request_with_ssl(now): def test_httpretty_ignores_querystrings_from_registered_uri(): "HTTPretty should mock a simple GET with urllib2.read()" - HTTPretty.register_uri(HTTPretty.GET, "http://yipit.com/?id=123", - body="Find the best daily deals") + HTTPretty.register_uri( + HTTPretty.GET, "http://yipit.com/?id=123", body="Find the best daily deals" + ) - fd = urlopen('http://yipit.com/?id=123') + fd = urlopen("http://yipit.com/?id=123") got = fd.read() fd.close() - got.should.equal(b'Find the best daily deals') - HTTPretty.last_request.method.should.equal('GET') - HTTPretty.last_request.path.should.equal('/?id=123') + got.should.equal(b"Find the best daily deals") + HTTPretty.last_request.method.should.equal("GET") + HTTPretty.last_request.path.should.equal("/?id=123") @httprettified @within(two=microseconds) def test_callback_response(now): - ("HTTPretty should all a callback function to be set as the body with" - " urllib2") + ("HTTPretty should all a callback function to be set as the body with" " urllib2") def request_callback(request, uri, headers): - return [200, headers, "The {} response from {}".format(decode_utf8(request.method), uri)] + return [ + 200, + headers, + "The {} response from {}".format(decode_utf8(request.method), uri), + ] HTTPretty.register_uri( - HTTPretty.GET, "https://api.yahoo.com/test", - body=request_callback) + HTTPretty.GET, "https://api.yahoo.com/test", body=request_callback + ) - fd = urllib2.urlopen('https://api.yahoo.com/test') + fd = urllib2.urlopen("https://api.yahoo.com/test") got = fd.read() fd.close() got.should.equal(b"The GET response from https://api.yahoo.com/test") HTTPretty.register_uri( - HTTPretty.POST, "https://api.yahoo.com/test_post", - body=request_callback) + HTTPretty.POST, "https://api.yahoo.com/test_post", body=request_callback + ) request = urllib2.Request( "https://api.yahoo.com/test_post", b'{"username": "gabrielfalcao"}', - { - 'content-type': 'text/json', - }, + {"content-type": "text/json"}, ) fd = urllib2.urlopen(request) got = fd.read() @@ -330,9 +344,7 @@ def test_httpretty_should_allow_registering_regexes(): body="Found brand", ) - request = urllib2.Request( - "https://api.yipit.com/v1/deal;brand=GAP", - ) + request = urllib2.Request("https://api.yipit.com/v1/deal;brand=GAP") fd = urllib2.urlopen(request) got = fd.read() fd.close() diff --git a/tests/functional/testserver.py b/tests/functional/testserver.py index 38d2ce4..0a874ea 100644 --- a/tests/functional/testserver.py +++ b/tests/functional/testserver.py @@ -36,6 +36,7 @@ from tornado.httpserver import HTTPServer from tornado.ioloop import IOLoop from httpretty import HTTPretty from httpretty.core import old_socket as true_socket + # from httpretty.compat import PY3 from httpretty.compat import binary_type from httpretty.compat import text_type @@ -44,10 +45,11 @@ from multiprocessing import Process def utf8(s): if isinstance(s, text_type): - s = s.encode('utf-8') + s = s.encode("utf-8") return binary_type(s) + # if not PY3: # bytes = lambda s, *args: str(s) @@ -71,14 +73,14 @@ class TornadoServer(object): @classmethod def get_handlers(cls): - return Application([ - (r"/go-for-bubbles/?", BubblesHandler), - (r"/come-again/?", ComeHandler), - ]) + return Application( + [(r"/go-for-bubbles/?", BubblesHandler), (r"/come-again/?", ComeHandler)] + ) def start(self): def go(app, port, data={}): from httpretty import HTTPretty + HTTPretty.disable() http = HTTPServer(app) @@ -114,10 +116,12 @@ class TCPServer(object): def go(port): from httpretty import HTTPretty + HTTPretty.disable() import socket + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.bind(('localhost', port)) + s.bind(("localhost", port)) s.listen(True) conn, addr = s.accept() @@ -145,11 +149,11 @@ class TCPClient(object): def __init__(self, port): self.port = int(port) self.sock = true_socket(socket.AF_INET, socket.SOCK_STREAM) - self.sock.connect(('localhost', self.port)) + self.sock.connect(("localhost", self.port)) def send(self, data): if isinstance(data, text_type): - data = data.encode('utf-8') + data = data.encode("utf-8") self.sock.sendall(data) return self.sock.recv(len(data) + 11) diff --git a/tests/pyopenssl/__init__.py b/tests/pyopenssl/__init__.py index 4eaed8c..ad1391a 100644 --- a/tests/pyopenssl/__init__.py +++ b/tests/pyopenssl/__init__.py @@ -25,4 +25,5 @@ # OTHER DEALINGS IN THE SOFTWARE. import warnings -warnings.simplefilter('ignore') + +warnings.simplefilter("ignore") diff --git a/tests/pyopenssl/test_mock.py b/tests/pyopenssl/test_mock.py index 89320f2..54572fa 100644 --- a/tests/pyopenssl/test_mock.py +++ b/tests/pyopenssl/test_mock.py @@ -34,12 +34,13 @@ from sure import expect @httprettified def test_httpretty_overrides_when_pyopenssl_installed(): - ('HTTPretty should remove PyOpenSSLs urllib3 mock if it is installed') + ("HTTPretty should remove PyOpenSSLs urllib3 mock if it is installed") # And HTTPretty works successfully - HTTPretty.register_uri(HTTPretty.GET, "https://yipit.com/", - body="Find the best daily deals") + HTTPretty.register_uri( + HTTPretty.GET, "https://yipit.com/", body="Find the best daily deals" + ) - response = requests.get('https://yipit.com') - expect(response.text).to.equal('Find the best daily deals') - expect(HTTPretty.last_request.method).to.equal('GET') - expect(HTTPretty.last_request.path).to.equal('/') + response = requests.get("https://yipit.com") + expect(response.text).to.equal("Find the best daily deals") + expect(HTTPretty.last_request.method).to.equal("GET") + expect(HTTPretty.last_request.path).to.equal("/") diff --git a/tests/unit/test_core.py b/tests/unit/test_core.py index 450b021..0758b73 100644 --- a/tests/unit/test_core.py +++ b/tests/unit/test_core.py @@ -19,41 +19,47 @@ class SocketErrorStub(Exception): def test_request_stubs_internals(): - ("HTTPrettyRequest is a BaseHTTPRequestHandler that replaces " - "real socket file descriptors with in-memory ones") + ( + "HTTPrettyRequest is a BaseHTTPRequestHandler that replaces " + "real socket file descriptors with in-memory ones" + ) # Given a valid HTTP request header string - headers = "\r\n".join([ - 'POST /somewhere/?name=foo&age=bar HTTP/1.1', - 'accept-encoding: identity', - 'host: github.com', - 'content-type: application/json', - 'connection: close', - 'user-agent: Python-urllib/2.7', - ]) + headers = "\r\n".join( + [ + "POST /somewhere/?name=foo&age=bar HTTP/1.1", + "accept-encoding: identity", + "host: github.com", + "content-type: application/json", + "connection: close", + "user-agent: Python-urllib/2.7", + ] + ) # When I create a HTTPrettyRequest with an empty body - request = HTTPrettyRequest(headers, body='') + request = HTTPrettyRequest(headers, body="") # Then it should have parsed the headers - dict(request.headers).should.equal({ - 'accept-encoding': 'identity', - 'connection': 'close', - 'content-type': 'application/json', - 'host': 'github.com', - 'user-agent': 'Python-urllib/2.7' - }) + dict(request.headers).should.equal( + { + "accept-encoding": "identity", + "connection": "close", + "content-type": "application/json", + "host": "github.com", + "user-agent": "Python-urllib/2.7", + } + ) # And the `rfile` should be a StringIO - type_as_str = StringIO.__module__ + '.' + StringIO.__name__ + type_as_str = StringIO.__module__ + "." + StringIO.__name__ - request.should.have.property('rfile').being.a(type_as_str) + request.should.have.property("rfile").being.a(type_as_str) # And the `wfile` should be a StringIO - request.should.have.property('wfile').being.a(type_as_str) + request.should.have.property("wfile").being.a(type_as_str) # And the `method` should be available - request.should.have.property('method').being.equal('POST') + request.should.have.property("method").being.equal("POST") def test_request_parse_querystring(): @@ -61,88 +67,92 @@ def test_request_parse_querystring(): # Given a request string containing a unicode encoded querystring - headers = "\r\n".join([ - 'POST /create?name=Gabriel+Falcão HTTP/1.1', - 'Content-Type: multipart/form-data', - ]) + headers = "\r\n".join( + [ + "POST /create?name=Gabriel+Falcão HTTP/1.1", + "Content-Type: multipart/form-data", + ] + ) # When I create a HTTPrettyRequest with an empty body - request = HTTPrettyRequest(headers, body='') + request = HTTPrettyRequest(headers, body="") # Then it should have a parsed querystring - request.querystring.should.equal({'name': ['Gabriel Falcão']}) + request.querystring.should.equal({"name": ["Gabriel Falcão"]}) def test_request_parse_body_when_it_is_application_json(): - ("HTTPrettyRequest#parse_request_body recognizes the " - "content-type `application/json` and parses it") + ( + "HTTPrettyRequest#parse_request_body recognizes the " + "content-type `application/json` and parses it" + ) # Given a request string containing a unicode encoded querystring - headers = "\r\n".join([ - 'POST /create HTTP/1.1', - 'Content-Type: application/json', - ]) + headers = "\r\n".join(["POST /create HTTP/1.1", "Content-Type: application/json"]) # And a valid json body - body = json.dumps({'name': 'Gabriel Falcão'}) + body = json.dumps({"name": "Gabriel Falcão"}) # When I create a HTTPrettyRequest with that data request = HTTPrettyRequest(headers, body) # Then it should have a parsed body - request.parsed_body.should.equal({'name': 'Gabriel Falcão'}) + request.parsed_body.should.equal({"name": "Gabriel Falcão"}) def test_request_parse_body_when_it_is_text_json(): - ("HTTPrettyRequest#parse_request_body recognizes the " - "content-type `text/json` and parses it") + ( + "HTTPrettyRequest#parse_request_body recognizes the " + "content-type `text/json` and parses it" + ) # Given a request string containing a unicode encoded querystring - headers = "\r\n".join([ - 'POST /create HTTP/1.1', - 'Content-Type: text/json', - ]) + headers = "\r\n".join(["POST /create HTTP/1.1", "Content-Type: text/json"]) # And a valid json body - body = json.dumps({'name': 'Gabriel Falcão'}) + body = json.dumps({"name": "Gabriel Falcão"}) # When I create a HTTPrettyRequest with that data request = HTTPrettyRequest(headers, body) # Then it should have a parsed body - request.parsed_body.should.equal({'name': 'Gabriel Falcão'}) + request.parsed_body.should.equal({"name": "Gabriel Falcão"}) def test_request_parse_body_when_it_is_urlencoded(): - ("HTTPrettyRequest#parse_request_body recognizes the " - "content-type `application/x-www-form-urlencoded` and parses it") + ( + "HTTPrettyRequest#parse_request_body recognizes the " + "content-type `application/x-www-form-urlencoded` and parses it" + ) # Given a request string containing a unicode encoded querystring - headers = "\r\n".join([ - 'POST /create HTTP/1.1', - 'Content-Type: application/x-www-form-urlencoded', - ]) + headers = "\r\n".join( + ["POST /create HTTP/1.1", "Content-Type: application/x-www-form-urlencoded"] + ) # And a valid urlencoded body - body = "name=Gabriel+Falcão&age=25&projects=httpretty&projects=sure&projects=lettuce" + body = ( + "name=Gabriel+Falcão&age=25&projects=httpretty&projects=sure&projects=lettuce" + ) # When I create a HTTPrettyRequest with that data request = HTTPrettyRequest(headers, body) # Then it should have a parsed body - request.parsed_body.should.equal({ - 'name': ['Gabriel Falcão'], - 'age': ["25"], - 'projects': ["httpretty", "sure", "lettuce"] - }) + request.parsed_body.should.equal( + { + "name": ["Gabriel Falcão"], + "age": ["25"], + "projects": ["httpretty", "sure", "lettuce"], + } + ) def test_request_parse_body_when_unrecognized(): - ("HTTPrettyRequest#parse_request_body returns the value as " - "is if the Content-Type is not recognized") + ( + "HTTPrettyRequest#parse_request_body returns the value as " + "is if the Content-Type is not recognized" + ) # Given a request string containing a unicode encoded querystring - headers = "\r\n".join([ - 'POST /create HTTP/1.1', - 'Content-Type: whatever', - ]) + headers = "\r\n".join(["POST /create HTTP/1.1", "Content-Type: whatever"]) # And a valid urlencoded body body = "foobar:\nlalala" @@ -154,14 +164,10 @@ def test_request_parse_body_when_unrecognized(): def test_request_string_representation(): - ("HTTPrettyRequest should have a debug-friendly " - "string representation") + ("HTTPrettyRequest should have a debug-friendly " "string representation") # Given a request string containing a unicode encoded querystring - headers = "\r\n".join([ - 'POST /create HTTP/1.1', - 'Content-Type: JPEG-baby', - ]) + headers = "\r\n".join(["POST /create HTTP/1.1", "Content-Type: JPEG-baby"]) # And a valid urlencoded body body = "foobar:\nlalala" @@ -169,12 +175,16 @@ def test_request_string_representation(): request = HTTPrettyRequest(headers, body) # Then its string representation should show the headers and the body - str(request).should.equal('<HTTPrettyRequest("JPEG-baby", total_headers=1, body_length=14)>') + str(request).should.equal( + '<HTTPrettyRequest("JPEG-baby", total_headers=1, body_length=14)>' + ) def test_fake_ssl_socket_proxies_its_ow_socket(): - ("FakeSSLSocket is a simpel wrapper around its own socket, " - "which was designed to be a HTTPretty fake socket") + ( + "FakeSSLSocket is a simpel wrapper around its own socket, " + "which was designed to be a HTTPretty fake socket" + ) # Given a sentinel mock object socket = Mock() @@ -189,7 +199,7 @@ def test_fake_ssl_socket_proxies_its_ow_socket(): socket.send.assert_called_once_with("FOO") -@patch('httpretty.core.datetime') +@patch("httpretty.core.datetime") def test_fakesock_socket_getpeercert(dt): ("fakesock.socket#getpeercert should return a hardcoded fake certificate") # Background: @@ -199,24 +209,27 @@ def test_fakesock_socket_getpeercert(dt): socket = fakesock.socket() # And that it's bound to some host - socket._host = 'somewhere.com' + socket._host = "somewhere.com" # When I retrieve the peer certificate certificate = socket.getpeercert() # Then it should return a hardcoded value - certificate.should.equal({ - u'notAfter': 'Sep 29 04:20:00 GMT', - u'subject': ( - ((u'organizationName', u'*.somewhere.com'),), - ((u'organizationalUnitName', u'Domain Control Validated'),), - ((u'commonName', u'*.somewhere.com'),)), - u'subjectAltName': ( - (u'DNS', u'*.somewhere.com'), - (u'DNS', u'somewhere.com'), - (u'DNS', u'*') - ) - }) + certificate.should.equal( + { + "notAfter": "Sep 29 04:20:00 GMT", + "subject": ( + (("organizationName", "*.somewhere.com"),), + (("organizationalUnitName", "Domain Control Validated"),), + (("commonName", "*.somewhere.com"),), + ), + "subjectAltName": ( + ("DNS", "*.somewhere.com"), + ("DNS", "somewhere.com"), + ("DNS", "*"), + ), + } + ) def test_fakesock_socket_ssl(): @@ -234,31 +247,38 @@ def test_fakesock_socket_ssl(): result.should.equal(sentinel) -@patch('httpretty.core.old_socket') -@patch('httpretty.core.POTENTIAL_HTTP_PORTS') +@patch("httpretty.core.old_socket") +@patch("httpretty.core.POTENTIAL_HTTP_PORTS") def test_fakesock_socket_connect_fallback(POTENTIAL_HTTP_PORTS, old_socket): - ("fakesock.socket#connect should open a real connection if the " - "given port is not a potential http port") + ( + "fakesock.socket#connect should open a real connection if the " + "given port is not a potential http port" + ) # Background: the potential http ports are 80 and 443 - POTENTIAL_HTTP_PORTS.__contains__.side_effect = lambda other: int(other) in (80, 443) + POTENTIAL_HTTP_PORTS.__contains__.side_effect = lambda other: int(other) in ( + 80, + 443, + ) # Given a fake socket instance socket = fakesock.socket() # When it is connected to a remote server in a port that isn't 80 nor 443 - socket.connect(('somewhere.com', 42)) + socket.connect(("somewhere.com", 42)) # Then it should have open a real connection in the background - old_socket.return_value.connect.assert_called_once_with(('somewhere.com', 42)) + old_socket.return_value.connect.assert_called_once_with(("somewhere.com", 42)) # And _closed is set to False socket._closed.should.be.false -@patch('httpretty.core.old_socket') +@patch("httpretty.core.old_socket") def test_fakesock_socket_close(old_socket): - ("fakesock.socket#close should close the actual socket in case " - "it's not http and _closed is False") + ( + "fakesock.socket#close should close the actual socket in case " + "it's not http and _closed is False" + ) # Given a fake socket instance that is synthetically open socket = fakesock.socket() socket._closed = False @@ -274,22 +294,24 @@ def test_fakesock_socket_close(old_socket): socket._closed.should.be.true -@patch('httpretty.core.old_socket') +@patch("httpretty.core.old_socket") def test_fakesock_socket_makefile(old_socket): - ("fakesock.socket#makefile should set the mode, " - "bufsize and return its mocked file descriptor") + ( + "fakesock.socket#makefile should set the mode, " + "bufsize and return its mocked file descriptor" + ) # Given a fake socket that has a mocked Entry associated with it socket = fakesock.socket() socket._entry = Mock() # When I call makefile() - fd = socket.makefile(mode='rw', bufsize=512) + fd = socket.makefile(mode="rw", bufsize=512) # Then it should have returned the socket's own filedescriptor expect(fd).to.equal(socket.fd) # And the mode should have been set in the socket instance - socket._mode.should.equal('rw') + socket._mode.should.equal("rw") # And the bufsize should have been set in the socket instance socket._bufsize.should.equal(512) @@ -297,23 +319,27 @@ def test_fakesock_socket_makefile(old_socket): socket._entry.fill_filekind.assert_called_once_with(fd) -@patch('httpretty.core.old_socket') +@patch("httpretty.core.old_socket") def test_fakesock_socket_real_sendall(old_socket): - ("fakesock.socket#real_sendall calls truesock#connect and bails " - "out when not http") + ( + "fakesock.socket#real_sendall calls truesock#connect and bails " + "out when not http" + ) # Background: the real socket will stop returning bytes after the # first call real_socket = old_socket.return_value - real_socket.recv.side_effect = [b'response from server', b""] + real_socket.recv.side_effect = [b"response from server", b""] # Given a fake socket socket = fakesock.socket() # When I call real_sendall with data, some args and kwargs - socket.real_sendall(b"SOMEDATA", b'some extra args...', foo=b'bar') + socket.real_sendall(b"SOMEDATA", b"some extra args...", foo=b"bar") # Then it should have called sendall in the real socket - real_socket.sendall.assert_called_once_with(b"SOMEDATA", b'some extra args...', foo=b'bar') + real_socket.sendall.assert_called_once_with( + b"SOMEDATA", b"some extra args...", foo=b"bar" + ) # And setblocking was never called real_socket.setblocking.called.should.be.false @@ -322,38 +348,40 @@ def test_fakesock_socket_real_sendall(old_socket): real_socket.recv.called.should.be.false # And the buffer is empty - socket.fd.read().should.equal(b'') + socket.fd.read().should.equal(b"") # And connect was never called real_socket.connect.called.should.be.false -@patch('httpretty.core.old_socket') +@patch("httpretty.core.old_socket") def test_fakesock_socket_real_sendall_when_http(old_socket): - ("fakesock.socket#real_sendall sends data and buffers " - "the response in the file descriptor") + ( + "fakesock.socket#real_sendall sends data and buffers " + "the response in the file descriptor" + ) # Background: the real socket will stop returning bytes after the # first call real_socket = old_socket.return_value - real_socket.recv.side_effect = [b'response from server', b""] + real_socket.recv.side_effect = [b"response from server", b""] # Given a fake socket socket = fakesock.socket() socket.is_http = True # When I call real_sendall with data, some args and kwargs - socket.real_sendall(b"SOMEDATA", b'some extra args...', foo=b'bar') + socket.real_sendall(b"SOMEDATA", b"some extra args...", foo=b"bar") # Then it should have called sendall in the real socket - real_socket.sendall.assert_called_once_with(b"SOMEDATA", b'some extra args...', foo=b'bar') + real_socket.sendall.assert_called_once_with( + b"SOMEDATA", b"some extra args...", foo=b"bar" + ) # And the socket was set to blocking real_socket.setblocking.assert_called_once_with(1) # And recv was called with the bufsize - real_socket.recv.assert_has_calls([ - call(socket._bufsize) - ]) + real_socket.recv.assert_has_calls([call(socket._bufsize)]) # And the buffer should contain the data from the server socket.fd.read().should.equal(b"response from server") @@ -362,33 +390,33 @@ def test_fakesock_socket_real_sendall_when_http(old_socket): real_socket.connect.called.should.be.true -@patch('httpretty.core.old_socket') -@patch('httpretty.core.socket') +@patch("httpretty.core.old_socket") +@patch("httpretty.core.socket") def test_fakesock_socket_real_sendall_continue_eagain_when_http(socket, old_socket): ("fakesock.socket#real_sendall should continue if the socket error was EAGAIN") socket.error = SocketErrorStub # Background: the real socket will stop returning bytes after the # first call real_socket = old_socket.return_value - real_socket.recv.side_effect = [SocketErrorStub(errno.EAGAIN), b'after error', b""] + real_socket.recv.side_effect = [SocketErrorStub(errno.EAGAIN), b"after error", b""] # Given a fake socket socket = fakesock.socket() socket.is_http = True # When I call real_sendall with data, some args and kwargs - socket.real_sendall(b"SOMEDATA", b'some extra args...', foo=b'bar') + socket.real_sendall(b"SOMEDATA", b"some extra args...", foo=b"bar") # Then it should have called sendall in the real socket - real_socket.sendall.assert_called_once_with(b"SOMEDATA", b'some extra args...', foo=b'bar') + real_socket.sendall.assert_called_once_with( + b"SOMEDATA", b"some extra args...", foo=b"bar" + ) # And the socket was set to blocking real_socket.setblocking.assert_called_once_with(1) # And recv was called with the bufsize - real_socket.recv.assert_has_calls([ - call(socket._bufsize) - ]) + real_socket.recv.assert_has_calls([call(socket._bufsize)]) # And the buffer should contain the data from the server socket.fd.read().should.equal(b"after error") @@ -397,25 +425,27 @@ def test_fakesock_socket_real_sendall_continue_eagain_when_http(socket, old_sock real_socket.connect.called.should.be.true -@patch('httpretty.core.old_socket') -@patch('httpretty.core.socket') +@patch("httpretty.core.old_socket") +@patch("httpretty.core.socket") def test_fakesock_socket_real_sendall_socket_error_when_http(socket, old_socket): ("fakesock.socket#real_sendall should continue if the socket error was EAGAIN") socket.error = SocketErrorStub # Background: the real socket will stop returning bytes after the # first call real_socket = old_socket.return_value - real_socket.recv.side_effect = [SocketErrorStub(42), b'after error', ""] + real_socket.recv.side_effect = [SocketErrorStub(42), b"after error", ""] # Given a fake socket socket = fakesock.socket() socket.is_http = True # When I call real_sendall with data, some args and kwargs - socket.real_sendall(b"SOMEDATA", b'some extra args...', foo=b'bar') + socket.real_sendall(b"SOMEDATA", b"some extra args...", foo=b"bar") # Then it should have called sendall in the real socket - real_socket.sendall.assert_called_once_with(b"SOMEDATA", b'some extra args...', foo=b'bar') + real_socket.sendall.assert_called_once_with( + b"SOMEDATA", b"some extra args...", foo=b"bar" + ) # And the socket was set to blocking real_socket.setblocking.assert_called_once_with(1) @@ -430,14 +460,16 @@ def test_fakesock_socket_real_sendall_socket_error_when_http(socket, old_socket) real_socket.connect.called.should.be.true -@patch('httpretty.core.old_socket') -@patch('httpretty.core.POTENTIAL_HTTP_PORTS') -def test_fakesock_socket_real_sendall_when_sending_data(POTENTIAL_HTTP_PORTS, old_socket): +@patch("httpretty.core.old_socket") +@patch("httpretty.core.POTENTIAL_HTTP_PORTS") +def test_fakesock_socket_real_sendall_when_sending_data( + POTENTIAL_HTTP_PORTS, old_socket +): ("fakesock.socket#real_sendall should connect before sending data") # Background: the real socket will stop returning bytes after the # first call real_socket = old_socket.return_value - real_socket.recv.side_effect = [b'response from foobar :)', b""] + real_socket.recv.side_effect = [b"response from foobar :)", b""] # And the potential http port is 4000 POTENTIAL_HTTP_PORTS.__contains__.side_effect = lambda other: int(other) == 4000 @@ -447,107 +479,119 @@ def test_fakesock_socket_real_sendall_when_sending_data(POTENTIAL_HTTP_PORTS, ol socket = fakesock.socket() # When I call connect to a server in a port that is considered HTTP - socket.connect(('foobar.com', 4000)) + socket.connect(("foobar.com", 4000)) # And send some data socket.real_sendall(b"SOMEDATA") # Then connect should have been called - real_socket.connect.assert_called_once_with(('foobar.com', 4000)) + real_socket.connect.assert_called_once_with(("foobar.com", 4000)) # And the socket was set to blocking real_socket.setblocking.assert_called_once_with(1) # And recv was called with the bufsize - real_socket.recv.assert_has_calls([ - call(socket._bufsize) - ]) + real_socket.recv.assert_has_calls([call(socket._bufsize)]) # And the buffer should contain the data from the server socket.fd.read().should.equal(b"response from foobar :)") -@patch('httpretty.core.old_socket') -@patch('httpretty.core.httpretty') -@patch('httpretty.core.POTENTIAL_HTTP_PORTS') -def test_fakesock_socket_sendall_with_valid_requestline(POTENTIAL_HTTP_PORTS, httpretty, old_socket): - ("fakesock.socket#sendall should create an entry if it's given a valid request line") - matcher = Mock(name='matcher') - info = Mock(name='info') +@patch("httpretty.core.old_socket") +@patch("httpretty.core.httpretty") +@patch("httpretty.core.POTENTIAL_HTTP_PORTS") +def test_fakesock_socket_sendall_with_valid_requestline( + POTENTIAL_HTTP_PORTS, httpretty, old_socket +): + ( + "fakesock.socket#sendall should create an entry if it's given a valid request line" + ) + matcher = Mock(name="matcher") + info = Mock(name="info") httpretty.match_uriinfo.return_value = (matcher, info) - httpretty.register_uri(httpretty.GET, 'http://foo.com/foobar') + httpretty.register_uri(httpretty.GET, "http://foo.com/foobar") # Background: # using a subclass of socket that mocks out real_sendall class MySocket(fakesock.socket): def real_sendall(self, data, *args, **kw): - raise AssertionError('should never call this...') + raise AssertionError("should never call this...") # Given an instance of that socket socket = MySocket() # And that is is considered http - socket.connect(('foo.com', 80)) + socket.connect(("foo.com", 80)) # When I try to send data socket.sendall(b"GET /foobar HTTP/1.1\r\nContent-Type: application/json\r\n\r\n") -@patch('httpretty.core.old_socket') -@patch('httpretty.core.httpretty') -@patch('httpretty.core.POTENTIAL_HTTP_PORTS') -def test_fakesock_socket_sendall_with_valid_requestline_2(POTENTIAL_HTTP_PORTS, httpretty, old_socket): - ("fakesock.socket#sendall should create an entry if it's given a valid request line") - matcher = Mock(name='matcher') - info = Mock(name='info') +@patch("httpretty.core.old_socket") +@patch("httpretty.core.httpretty") +@patch("httpretty.core.POTENTIAL_HTTP_PORTS") +def test_fakesock_socket_sendall_with_valid_requestline_2( + POTENTIAL_HTTP_PORTS, httpretty, old_socket +): + ( + "fakesock.socket#sendall should create an entry if it's given a valid request line" + ) + matcher = Mock(name="matcher") + info = Mock(name="info") httpretty.match_uriinfo.return_value = (matcher, info) - httpretty.register_uri(httpretty.GET, 'http://foo.com/foobar') + httpretty.register_uri(httpretty.GET, "http://foo.com/foobar") # Background: # using a subclass of socket that mocks out real_sendall class MySocket(fakesock.socket): def real_sendall(self, data, *args, **kw): - raise AssertionError('should never call this...') + raise AssertionError("should never call this...") # Given an instance of that socket socket = MySocket() # And that is is considered http - socket.connect(('foo.com', 80)) + socket.connect(("foo.com", 80)) # When I try to send data socket.sendall(b"GET /foobar HTTP/1.1\r\nContent-Type: application/json\r\n\r\n") -@patch('httpretty.core.old_socket') -@patch('httpretty.core.POTENTIAL_HTTP_PORTS') -def test_fakesock_socket_sendall_with_body_data_no_entry(POTENTIAL_HTTP_PORTS, old_socket): - ("fakesock.socket#sendall should call real_sendall when not parsing headers and there is no entry") +@patch("httpretty.core.old_socket") +@patch("httpretty.core.POTENTIAL_HTTP_PORTS") +def test_fakesock_socket_sendall_with_body_data_no_entry( + POTENTIAL_HTTP_PORTS, old_socket +): + ( + "fakesock.socket#sendall should call real_sendall when not parsing headers and there is no entry" + ) # Background: # Using a subclass of socket that mocks out real_sendall class MySocket(fakesock.socket): def real_sendall(self, data): - data.should.equal(b'BLABLABLABLA') - return 'cool' + data.should.equal(b"BLABLABLABLA") + return "cool" # Given an instance of that socket socket = MySocket() socket._entry = None # And that is is considered http - socket.connect(('foo.com', 80)) + socket.connect(("foo.com", 80)) # When I try to send data result = socket.sendall(b"BLABLABLABLA") # Then the result should be the return value from real_sendall - result.should.equal('cool') + result.should.equal("cool") -@patch('httpretty.core.old_socket') -@patch('httpretty.core.POTENTIAL_HTTP_PORTS') -def test_fakesock_socket_sendall_with_body_data_with_entry(POTENTIAL_HTTP_PORTS, old_socket): +@patch("httpretty.core.old_socket") +@patch("httpretty.core.POTENTIAL_HTTP_PORTS") +def test_fakesock_socket_sendall_with_body_data_with_entry( + POTENTIAL_HTTP_PORTS, old_socket +): ("fakesock.socket#sendall should call real_sendall when there is no entry") # Background: # Using a subclass of socket that mocks out real_sendall @@ -561,96 +605,110 @@ def test_fakesock_socket_sendall_with_body_data_with_entry(POTENTIAL_HTTP_PORTS, socket = MySocket() # And that is is considered http - socket.connect(('foo.com', 80)) + socket.connect(("foo.com", 80)) # When I try to send data socket.sendall(b"BLABLABLABLA") # Then it should have called real_sendall - data_sent.should.equal([b'BLABLABLABLA']) + data_sent.should.equal([b"BLABLABLABLA"]) -@patch('httpretty.core.httpretty.match_uriinfo') -@patch('httpretty.core.old_socket') -@patch('httpretty.core.POTENTIAL_HTTP_PORTS') -def test_fakesock_socket_sendall_with_body_data_with_chunked_entry(POTENTIAL_HTTP_PORTS, old_socket, match_uriinfo): +@patch("httpretty.core.httpretty.match_uriinfo") +@patch("httpretty.core.old_socket") +@patch("httpretty.core.POTENTIAL_HTTP_PORTS") +def test_fakesock_socket_sendall_with_body_data_with_chunked_entry( + POTENTIAL_HTTP_PORTS, old_socket, match_uriinfo +): ("fakesock.socket#sendall should call real_sendall when not ") # Background: # Using a subclass of socket that mocks out real_sendall class MySocket(fakesock.socket): def real_sendall(self, data): - raise AssertionError('should have never been called') + raise AssertionError("should have never been called") - matcher = Mock(name='matcher') - info = Mock(name='info') + matcher = Mock(name="matcher") + info = Mock(name="info") httpretty.match_uriinfo.return_value = (matcher, info) # Using a mocked entry entry = Mock() - entry.method = 'GET' - entry.info.path = '/foo' + entry.method = "GET" + entry.info.path = "/foo" - entry.request.headers = { - 'transfer-encoding': 'chunked', - } - entry.request.body = b'' + entry.request.headers = {"transfer-encoding": "chunked"} + entry.request.body = b"" # Given an instance of that socket socket = MySocket() socket._entry = entry # And that is is considered http - socket.connect(('foo.com', 80)) + socket.connect(("foo.com", 80)) # When I try to send data socket.sendall(b"BLABLABLABLA") # Then the entry should have that body - httpretty.last_request.body.should.equal(b'BLABLABLABLA') + httpretty.last_request.body.should.equal(b"BLABLABLABLA") def test_URIMatcher_respects_querystring(): ("URIMatcher response querystring") - matcher = URIMatcher('http://www.foo.com/?query=true', None) - info = URIInfo.from_uri('http://www.foo.com/', None) + matcher = URIMatcher("http://www.foo.com/?query=true", None) + info = URIInfo.from_uri("http://www.foo.com/", None) assert matcher.matches(info) - matcher = URIMatcher('http://www.foo.com/?query=true', None, match_querystring=True) - info = URIInfo.from_uri('http://www.foo.com/', None) + matcher = URIMatcher("http://www.foo.com/?query=true", None, match_querystring=True) + info = URIInfo.from_uri("http://www.foo.com/", None) assert not matcher.matches(info) - matcher = URIMatcher('http://www.foo.com/?query=true', None, match_querystring=True) - info = URIInfo.from_uri('http://www.foo.com/?query=true', None) + matcher = URIMatcher("http://www.foo.com/?query=true", None, match_querystring=True) + info = URIInfo.from_uri("http://www.foo.com/?query=true", None) assert matcher.matches(info) - matcher = URIMatcher('http://www.foo.com/?query=true&unquery=false', None, match_querystring=True) - info = URIInfo.from_uri('http://www.foo.com/?unquery=false&query=true', None) + matcher = URIMatcher( + "http://www.foo.com/?query=true&unquery=false", None, match_querystring=True + ) + info = URIInfo.from_uri("http://www.foo.com/?unquery=false&query=true", None) assert matcher.matches(info) - matcher = URIMatcher('http://www.foo.com/?unquery=false&query=true', None, match_querystring=True) - info = URIInfo.from_uri('http://www.foo.com/?query=true&unquery=false', None) + matcher = URIMatcher( + "http://www.foo.com/?unquery=false&query=true", None, match_querystring=True + ) + info = URIInfo.from_uri("http://www.foo.com/?query=true&unquery=false", None) assert matcher.matches(info) def test_URIMatcher_equality_respects_querystring(): ("URIMatcher equality check should check querystring") - matcher_a = URIMatcher('http://www.foo.com/?query=true', None) - matcher_b = URIMatcher('http://www.foo.com/?query=false', None) + matcher_a = URIMatcher("http://www.foo.com/?query=true", None) + matcher_b = URIMatcher("http://www.foo.com/?query=false", None) assert matcher_a == matcher_b - matcher_a = URIMatcher('http://www.foo.com/?query=true', None) - matcher_b = URIMatcher('http://www.foo.com/', None) + matcher_a = URIMatcher("http://www.foo.com/?query=true", None) + matcher_b = URIMatcher("http://www.foo.com/", None) assert matcher_a == matcher_b - matcher_a = URIMatcher('http://www.foo.com/?query=true', None, match_querystring=True) - matcher_b = URIMatcher('http://www.foo.com/?query=false', None, match_querystring=True) + matcher_a = URIMatcher( + "http://www.foo.com/?query=true", None, match_querystring=True + ) + matcher_b = URIMatcher( + "http://www.foo.com/?query=false", None, match_querystring=True + ) assert not matcher_a == matcher_b - matcher_a = URIMatcher('http://www.foo.com/?query=true', None, match_querystring=True) - matcher_b = URIMatcher('http://www.foo.com/', None, match_querystring=True) + matcher_a = URIMatcher( + "http://www.foo.com/?query=true", None, match_querystring=True + ) + matcher_b = URIMatcher("http://www.foo.com/", None, match_querystring=True) assert not matcher_a == matcher_b - matcher_a = URIMatcher('http://www.foo.com/?query=true&unquery=false', None, match_querystring=True) - matcher_b = URIMatcher('http://www.foo.com/?unquery=false&query=true', None, match_querystring=True) + matcher_a = URIMatcher( + "http://www.foo.com/?query=true&unquery=false", None, match_querystring=True + ) + matcher_b = URIMatcher( + "http://www.foo.com/?unquery=false&query=true", None, match_querystring=True + ) assert matcher_a == matcher_b diff --git a/tests/unit/test_httpretty.py b/tests/unit/test_httpretty.py index 3ae4f4f..f072b31 100644 --- a/tests/unit/test_httpretty.py +++ b/tests/unit/test_httpretty.py @@ -50,38 +50,40 @@ Content-Type: %(content_type)s def test_httpretty_should_raise_proper_exception_on_inconsistent_length(): - ("HTTPretty should raise proper exception on inconsistent Content-Length / " - "registered response body") + ( + "HTTPretty should raise proper exception on inconsistent Content-Length / " + "registered response body" + ) HTTPretty.register_uri.when.called_with( HTTPretty.GET, "http://github.com/gabrielfalcao", body="that's me!", - adding_headers={ - 'Content-Length': '999' - } + adding_headers={"Content-Length": "999"}, ).should.have.raised( HTTPrettyError, 'HTTPretty got inconsistent parameters. The header Content-Length you registered expects size "999" ' - 'but the body you registered for that has actually length "10".' + 'but the body you registered for that has actually length "10".', ) def test_httpretty_should_raise_on_socket_send_when_uri_registered(): - ("HTTPretty should raise a RuntimeError when the fakesocket is " - "used in an invalid usage.") + ( + "HTTPretty should raise a RuntimeError when the fakesocket is " + "used in an invalid usage." + ) import socket + HTTPretty.enable() - HTTPretty.register_uri(HTTPretty.GET, - 'http://127.0.0.1:5000') + HTTPretty.register_uri(HTTPretty.GET, "http://127.0.0.1:5000") expect(core.POTENTIAL_HTTP_PORTS).to.be.equal(set([80, 5000])) expect(core.POTENTIAL_HTTPS_PORTS).to.be.equal(set([443])) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.connect(('127.0.0.1', 5000)) - expect(sock.send).when.called_with(b'whatever').to.throw(RuntimeError) + sock.connect(("127.0.0.1", 5000)) + expect(sock.send).when.called_with(b"whatever").to.throw(RuntimeError) sock.close() # restore the previous value @@ -91,17 +93,20 @@ def test_httpretty_should_raise_on_socket_send_when_uri_registered(): def test_httpretty_should_not_raise_on_socket_send_when_uri_not_registered(): - ("HTTPretty should not raise a RuntimeError when the fakesocket " - "is used in an invalid usage.") + ( + "HTTPretty should not raise a RuntimeError when the fakesocket " + "is used in an invalid usage." + ) import socket + HTTPretty.enable() sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0) sock.setblocking(0) - expect(sock.sendto).when.called_with(b'whatever', - ('127.0.0.1', 53) - ).should_not.throw(RuntimeError) + expect(sock.sendto).when.called_with( + b"whatever", ("127.0.0.1", 53) + ).should_not.throw(RuntimeError) sock.close() HTTPretty.reset() @@ -109,7 +114,7 @@ def test_httpretty_should_not_raise_on_socket_send_when_uri_not_registered(): def test_does_not_have_last_request_by_default(): - 'HTTPretty.last_request is a dummy object by default' + "HTTPretty.last_request is a dummy object by default" HTTPretty.reset() expect(HTTPretty.last_request.headers).to.be.empty @@ -119,93 +124,95 @@ def test_does_not_have_last_request_by_default(): def test_status_codes(): "HTTPretty supports N status codes" - expect(STATUSES).to.equal({ - 100: "Continue", - 101: "Switching Protocols", - 102: "Processing", - 200: "OK", - 201: "Created", - 202: "Accepted", - 203: "Non-Authoritative Information", - 204: "No Content", - 205: "Reset Content", - 206: "Partial Content", - 207: "Multi-Status", - 208: "Already Reported", - 226: "IM Used", - 300: "Multiple Choices", - 301: "Moved Permanently", - 302: "Found", - 303: "See Other", - 304: "Not Modified", - 305: "Use Proxy", - 306: "Switch Proxy", - 307: "Temporary Redirect", - 308: "Permanent Redirect", - 400: "Bad Request", - 401: "Unauthorized", - 402: "Payment Required", - 403: "Forbidden", - 404: "Not Found", - 405: "Method Not Allowed", - 406: "Not Acceptable", - 407: "Proxy Authentication Required", - 408: "Request a Timeout", - 409: "Conflict", - 410: "Gone", - 411: "Length Required", - 412: "Precondition Failed", - 413: "Request Entity Too Large", - 414: "Request-URI Too Long", - 415: "Unsupported Media Type", - 416: "Requested Range Not Satisfiable", - 417: "Expectation Failed", - 418: "I'm a teapot", - 420: "Enhance Your Calm", - 422: "Unprocessable Entity", - 423: "Locked", - 424: "Failed Dependency", - 425: "Unordered Collection", - 426: "Upgrade Required", - 428: "Precondition Required", - 429: "Too Many Requests", - 431: "Request Header Fields Too Large", - 444: "No Response", - 449: "Retry With", - 450: "Blocked by Windows Parental Controls", - 451: "Unavailable For Legal Reasons", - 494: "Request Header Too Large", - 495: "Cert Error", - 496: "No Cert", - 497: "HTTP to HTTPS", - 499: "Client Closed Request", - 500: "Internal Server Error", - 501: "Not Implemented", - 502: "Bad Gateway", - 503: "Service Unavailable", - 504: "Gateway Timeout", - 505: "HTTP Version Not Supported", - 506: "Variant Also Negotiates", - 507: "Insufficient Storage", - 508: "Loop Detected", - 509: "Bandwidth Limit Exceeded", - 510: "Not Extended", - 511: "Network Authentication Required", - 598: "Network read timeout error", - 599: "Network connect timeout error", - }) + expect(STATUSES).to.equal( + { + 100: "Continue", + 101: "Switching Protocols", + 102: "Processing", + 200: "OK", + 201: "Created", + 202: "Accepted", + 203: "Non-Authoritative Information", + 204: "No Content", + 205: "Reset Content", + 206: "Partial Content", + 207: "Multi-Status", + 208: "Already Reported", + 226: "IM Used", + 300: "Multiple Choices", + 301: "Moved Permanently", + 302: "Found", + 303: "See Other", + 304: "Not Modified", + 305: "Use Proxy", + 306: "Switch Proxy", + 307: "Temporary Redirect", + 308: "Permanent Redirect", + 400: "Bad Request", + 401: "Unauthorized", + 402: "Payment Required", + 403: "Forbidden", + 404: "Not Found", + 405: "Method Not Allowed", + 406: "Not Acceptable", + 407: "Proxy Authentication Required", + 408: "Request a Timeout", + 409: "Conflict", + 410: "Gone", + 411: "Length Required", + 412: "Precondition Failed", + 413: "Request Entity Too Large", + 414: "Request-URI Too Long", + 415: "Unsupported Media Type", + 416: "Requested Range Not Satisfiable", + 417: "Expectation Failed", + 418: "I'm a teapot", + 420: "Enhance Your Calm", + 422: "Unprocessable Entity", + 423: "Locked", + 424: "Failed Dependency", + 425: "Unordered Collection", + 426: "Upgrade Required", + 428: "Precondition Required", + 429: "Too Many Requests", + 431: "Request Header Fields Too Large", + 444: "No Response", + 449: "Retry With", + 450: "Blocked by Windows Parental Controls", + 451: "Unavailable For Legal Reasons", + 494: "Request Header Too Large", + 495: "Cert Error", + 496: "No Cert", + 497: "HTTP to HTTPS", + 499: "Client Closed Request", + 500: "Internal Server Error", + 501: "Not Implemented", + 502: "Bad Gateway", + 503: "Service Unavailable", + 504: "Gateway Timeout", + 505: "HTTP Version Not Supported", + 506: "Variant Also Negotiates", + 507: "Insufficient Storage", + 508: "Loop Detected", + 509: "Bandwidth Limit Exceeded", + 510: "Not Extended", + 511: "Network Authentication Required", + 598: "Network read timeout error", + 599: "Network connect timeout error", + } + ) def test_uri_info_full_url(): uri_info = URIInfo( - username='johhny', - password='password', - hostname=b'google.com', + username="johhny", + password="password", + hostname=b"google.com", port=80, - path=b'/', - query=b'foo=bar&baz=test', - fragment='', - scheme='', + path=b"/", + query=b"foo=bar&baz=test", + fragment="", + scheme="", ) expect(uri_info.full_url()).to.equal( @@ -222,24 +229,24 @@ def test_uri_info_eq_ignores_case(): hostname matching. """ uri_info_uppercase = URIInfo( - username='johhny', - password='password', - hostname=b'GOOGLE.COM', + username="johhny", + password="password", + hostname=b"GOOGLE.COM", port=80, - path=b'/', - query=b'foo=bar&baz=test', - fragment='', - scheme='', + path=b"/", + query=b"foo=bar&baz=test", + fragment="", + scheme="", ) uri_info_lowercase = URIInfo( - username='johhny', - password='password', - hostname=b'google.com', + username="johhny", + password="password", + hostname=b"google.com", port=80, - path=b'/', - query=b'foo=bar&baz=test', - fragment='', - scheme='', + path=b"/", + query=b"foo=bar&baz=test", + fragment="", + scheme="", ) expect(uri_info_uppercase).to.equal(uri_info_lowercase) @@ -256,43 +263,48 @@ def test_global_boolean_enabled(): def test_py3kobject_implements_valid__repr__based_on__str__(): class MyObject(BaseClass): def __str__(self): - return 'hi' + return "hi" myobj = MyObject() - expect(repr(myobj)).to.be.equal('hi') + expect(repr(myobj)).to.be.equal("hi") def test_Entry_class_normalizes_headers(): - entry = Entry(HTTPretty.GET, 'http://example.com', 'example', - host='example.com', cache_control='no-cache', x_forward_for='proxy') + entry = Entry( + HTTPretty.GET, + "http://example.com", + "example", + host="example.com", + cache_control="no-cache", + x_forward_for="proxy", + ) - entry.adding_headers.should.equal({ - 'Host': 'example.com', - 'Cache-Control': 'no-cache', - 'X-Forward-For': 'proxy' - }) + entry.adding_headers.should.equal( + {"Host": "example.com", "Cache-Control": "no-cache", "X-Forward-For": "proxy"} + ) def test_Entry_class_counts_multibyte_characters_in_bytes(): - entry = Entry(HTTPretty.GET, 'http://example.com', 'こんにちは') + entry = Entry(HTTPretty.GET, "http://example.com", "こんにちは") buf = FakeSockFile() entry.fill_filekind(buf) response = buf.read() - expect(b'content-length: 15\n').to.be.within(response) + expect(b"content-length: 15\n").to.be.within(response) def test_Entry_class_counts_dynamic(): - result = (200, {}, 'こんにちは'.encode('utf-8')) - entry = Entry(HTTPretty.GET, 'http://example.com', lambda *args: result) - entry.info = URIInfo.from_uri('http://example.com', entry) + result = (200, {}, "こんにちは".encode("utf-8")) + entry = Entry(HTTPretty.GET, "http://example.com", lambda *args: result) + entry.info = URIInfo.from_uri("http://example.com", entry) buf = FakeSockFile() entry.fill_filekind(buf) response = buf.getvalue() - expect(b'content-length: 15\n').to.be.within(response) + expect(b"content-length: 15\n").to.be.within(response) def test_fake_socket_passes_through_setblocking(): import socket + HTTPretty.enable() s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.truesock = MagicMock() @@ -302,6 +314,7 @@ def test_fake_socket_passes_through_setblocking(): def test_fake_socket_passes_through_fileno(): import socket + with httpretty.enabled(): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.truesock = MagicMock() @@ -311,15 +324,19 @@ def test_fake_socket_passes_through_fileno(): def test_fake_socket_passes_through_getsockopt(): import socket + HTTPretty.enable() s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.truesock = MagicMock() - expect(s.getsockopt).called_with(socket.SOL_SOCKET, 1).should_not.throw(AttributeError) + expect(s.getsockopt).called_with(socket.SOL_SOCKET, 1).should_not.throw( + AttributeError + ) s.truesock.getsockopt.assert_called_with(socket.SOL_SOCKET, 1) def test_fake_socket_passes_through_bind(): import socket + HTTPretty.enable() s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.truesock = MagicMock() @@ -329,6 +346,7 @@ def test_fake_socket_passes_through_bind(): def test_fake_socket_passes_through_connect_ex(): import socket + HTTPretty.enable() s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.truesock = MagicMock() @@ -338,6 +356,7 @@ def test_fake_socket_passes_through_connect_ex(): def test_fake_socket_passes_through_listen(): import socket + HTTPretty.enable() s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.truesock = MagicMock() @@ -347,6 +366,7 @@ def test_fake_socket_passes_through_listen(): def test_fake_socket_passes_through_getpeername(): import socket + HTTPretty.enable() s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.truesock = MagicMock() @@ -356,6 +376,7 @@ def test_fake_socket_passes_through_getpeername(): def test_fake_socket_passes_through_getsockname(): import socket + HTTPretty.enable() s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.truesock = MagicMock() @@ -365,6 +386,7 @@ def test_fake_socket_passes_through_getsockname(): def test_fake_socket_passes_through_gettimeout(): import socket + HTTPretty.enable() s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.truesock = MagicMock() @@ -374,6 +396,7 @@ def test_fake_socket_passes_through_gettimeout(): def test_fake_socket_passes_through_shutdown(): import socket + HTTPretty.enable() s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.truesock = MagicMock() @@ -383,11 +406,12 @@ def test_fake_socket_passes_through_shutdown(): def test_unix_socket(): import socket + HTTPretty.enable() # Create a UDS socket sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - server_address = './not-exist-socket' + server_address = "./not-exist-socket" try: sock.connect(server_address) except socket.error: @@ -397,32 +421,32 @@ def test_unix_socket(): def test_HTTPrettyRequest_json_body(): """ A content-type of application/json should parse a valid json body """ - header = TEST_HEADER % {'content_type': 'application/json'} - test_dict = {'hello': 'world'} + header = TEST_HEADER % {"content_type": "application/json"} + test_dict = {"hello": "world"} request = HTTPrettyRequest(header, json.dumps(test_dict)) expect(request.parsed_body).to.equal(test_dict) def test_HTTPrettyRequest_invalid_json_body(): """ A content-type of application/json with an invalid json body should return the content unaltered """ - header = TEST_HEADER % {'content_type': 'application/json'} - invalid_json = u"{'hello', 'world','thisstringdoesntstops}" + header = TEST_HEADER % {"content_type": "application/json"} + invalid_json = "{'hello', 'world','thisstringdoesntstops}" request = HTTPrettyRequest(header, invalid_json) expect(request.parsed_body).to.equal(invalid_json) def test_HTTPrettyRequest_queryparam(): """ A content-type of x-www-form-urlencoded with a valid queryparam body should return parsed content """ - header = TEST_HEADER % {'content_type': 'application/x-www-form-urlencoded'} - valid_queryparam = u"hello=world&this=isavalidquerystring" - valid_results = {'hello': ['world'], 'this': ['isavalidquerystring']} + header = TEST_HEADER % {"content_type": "application/x-www-form-urlencoded"} + valid_queryparam = "hello=world&this=isavalidquerystring" + valid_results = {"hello": ["world"], "this": ["isavalidquerystring"]} request = HTTPrettyRequest(header, valid_queryparam) expect(request.parsed_body).to.equal(valid_results) def test_HTTPrettyRequest_arbitrarypost(): """ A non-handled content type request's post body should return the content unaltered """ - header = TEST_HEADER % {'content_type': 'thisis/notarealcontenttype'} + header = TEST_HEADER % {"content_type": "thisis/notarealcontenttype"} gibberish_body = "1234567890!@#$%^&*()" request = HTTPrettyRequest(header, gibberish_body) expect(request.parsed_body).to.equal(gibberish_body) @@ -435,8 +459,9 @@ def test_socktype_bad_python_version_regression(): https://bugs.python.org/issue20386 """ import socket + someObject = object() - with patch('socket.SocketType', someObject): + with patch("socket.SocketType", someObject): HTTPretty.enable() expect(socket.SocketType).to.equal(someObject) HTTPretty.disable() @@ -444,7 +469,8 @@ def test_socktype_bad_python_version_regression(): def test_socktype_good_python_version(): import socket - with patch('socket.SocketType', socket.socket): + + with patch("socket.SocketType", socket.socket): HTTPretty.enable() expect(socket.SocketType).to.equal(socket.socket) HTTPretty.disable() @@ -454,21 +480,17 @@ def test_httpretty_should_allow_registering_regex_hostnames(): "HTTPretty should allow registering regexes with requests" HTTPretty.register_uri( - HTTPretty.GET, - re.compile(r'^http://\w+\.foo\.com/baz$'), - body="yay", + HTTPretty.GET, re.compile(r"^http://\w+\.foo\.com/baz$"), body="yay" ) - assert HTTPretty.match_http_address('www.foo.com', 80) + assert HTTPretty.match_http_address("www.foo.com", 80) def test_httpretty_should_allow_registering_regex_hostnames_ssl(): "HTTPretty should allow registering regexes with requests (ssl version)" HTTPretty.register_uri( - HTTPretty.GET, - re.compile(r'^https://\w+\.foo\.com/baz$'), - body="yay", + HTTPretty.GET, re.compile(r"^https://\w+\.foo\.com/baz$"), body="yay" ) - assert HTTPretty.match_https_hostname('www.foo.com') + assert HTTPretty.match_https_hostname("www.foo.com") diff --git a/tests/unit/test_main.py b/tests/unit/test_main.py index 6ce49d8..1c36f9e 100644 --- a/tests/unit/test_main.py +++ b/tests/unit/test_main.py @@ -6,7 +6,7 @@ import httpretty from httpretty.core import HTTPrettyRequest -@patch('httpretty.httpretty') +@patch("httpretty.httpretty") def test_last_request(original): ("httpretty.last_request() should return httpretty.core.last_request") @@ -14,9 +14,11 @@ def test_last_request(original): def test_has_request(): - ("httpretty.has_request() correctly detects " - "whether or not a request has been made") + ( + "httpretty.has_request() correctly detects " + "whether or not a request has been made" + ) httpretty.reset() httpretty.has_request().should.be.false - with patch('httpretty.httpretty.last_request', return_value=HTTPrettyRequest('')): + with patch("httpretty.httpretty.last_request", return_value=HTTPrettyRequest("")): httpretty.has_request().should.be.true |
