summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGabriel Falcão <gabriel@nacaolivre.org>2018-11-04 23:34:43 +0100
committerGabriel Falcão <gabriel@nacaolivre.org>2018-11-04 23:34:43 +0100
commit067c4bda72a442e1771374f9d69a5a1c77c88de5 (patch)
tree907b5303ea0c77fbc64daa67e79920577c8edc2f
parent2314893ec5ab46513e91ab867d7b55a72968909e (diff)
downloadhttpretty-black.tar.gz
prettify code with blackblack
-rw-r--r--docs/source/conf.py70
-rw-r--r--httpretty/compat.py40
-rw-r--r--httpretty/core.py606
-rw-r--r--httpretty/errors.py4
-rw-r--r--httpretty/http.py22
-rw-r--r--httpretty/utils.py6
-rw-r--r--httpretty/version.py2
-rw-r--r--setup.py54
-rw-r--r--tests/functional/__init__.py3
-rw-r--r--tests/functional/base.py30
-rw-r--r--tests/functional/test_bypass.py68
-rw-r--r--tests/functional/test_debug.py24
-rw-r--r--tests/functional/test_decorator.py62
-rw-r--r--tests/functional/test_fakesocket.py23
-rw-r--r--tests/functional/test_httplib2.py309
-rw-r--r--tests/functional/test_passthrough.py16
-rw-r--r--tests/functional/test_requests.py748
-rw-r--r--tests/functional/test_urllib2.py258
-rw-r--r--tests/functional/testserver.py20
-rw-r--r--tests/pyopenssl/__init__.py3
-rw-r--r--tests/pyopenssl/test_mock.py15
-rw-r--r--tests/unit/test_core.py470
-rw-r--r--tests/unit/test_httpretty.py320
-rw-r--r--tests/unit/test_main.py10
24 files changed, 1667 insertions, 1516 deletions
diff --git a/docs/source/conf.py b/docs/source/conf.py
index 0dea7ef..b345c9b 100644
--- a/docs/source/conf.py
+++ b/docs/source/conf.py
@@ -2,21 +2,22 @@
import sys
import sphinx_rtd_theme
+
try:
from pathlib2 import Path
except ImportError:
from pathlib import Path
-project_path = Path(__file__).absolute().parent.joinpath('../..')
+project_path = Path(__file__).absolute().parent.joinpath("../..")
sys.path.insert(0, project_path.as_posix())
-from httpretty.version import version # noqa
+from httpretty.version import version # noqa
-project = 'HTTPretty'
-copyright = '2018, Gabriel Falcao'
-author = 'Gabriel Falcao'
+project = "HTTPretty"
+copyright = "2018, Gabriel Falcao"
+author = "Gabriel Falcao"
# The short X.Y version
version = version
@@ -25,47 +26,49 @@ release = version
extensions = [
- 'sphinx.ext.autodoc',
- 'sphinx.ext.doctest',
- 'sphinx.ext.intersphinx',
- 'sphinx.ext.coverage',
- 'sphinx.ext.ifconfig',
- 'sphinx.ext.viewcode',
- 'sphinx.ext.githubpages',
- 'sphinx.ext.autosummary',
+ "sphinx.ext.autodoc",
+ "sphinx.ext.doctest",
+ "sphinx.ext.intersphinx",
+ "sphinx.ext.coverage",
+ "sphinx.ext.ifconfig",
+ "sphinx.ext.viewcode",
+ "sphinx.ext.githubpages",
+ "sphinx.ext.autosummary",
]
-templates_path = ['_templates']
+templates_path = ["_templates"]
-source_suffix = '.rst'
-master_doc = 'index'
+source_suffix = ".rst"
+master_doc = "index"
language = None
exclude_patterns = []
-pygments_style = 'friendly'
+pygments_style = "friendly"
html_theme = "sphinx_rtd_theme"
html_theme_path = [sphinx_rtd_theme.get_html_theme_path()]
# html_theme_options = {}
-html_static_path = ['_static']
+html_static_path = ["_static"]
-htmlhelp_basename = 'HTTPrettydoc'
+htmlhelp_basename = "HTTPrettydoc"
latex_elements = {}
latex_documents = [
- (master_doc, 'HTTPretty.tex', 'HTTPretty Documentation',
- 'Gabriel Falcao', 'manual'),
+ (master_doc, "HTTPretty.tex", "HTTPretty Documentation", "Gabriel Falcao", "manual")
]
-man_pages = [
- (master_doc, 'httpretty', 'HTTPretty Documentation',
- [author], 1)
-]
+man_pages = [(master_doc, "httpretty", "HTTPretty Documentation", [author], 1)]
texinfo_documents = [
- (master_doc, 'HTTPretty', 'HTTPretty Documentation',
- author, 'HTTPretty', 'One line description of project.',
- 'Miscellaneous'),
+ (
+ master_doc,
+ "HTTPretty",
+ "HTTPretty Documentation",
+ author,
+ "HTTPretty",
+ "One line description of project.",
+ "Miscellaneous",
+ )
]
@@ -74,11 +77,10 @@ epub_author = author
epub_publisher = author
epub_copyright = copyright
-epub_exclude_files = ['search.html']
+epub_exclude_files = ["search.html"]
intersphinx_mapping = {
- 'python': ('https://docs.python.org/3', None),
-
- 'httplib2': ('https://httplib2.readthedocs.io/en/latest/', None),
- 'requests': ('http://docs.python-requests.org/en/master/', None),
- 'urllib3': ('https://urllib3.readthedocs.io/en/latest/', None),
+ "python": ("https://docs.python.org/3", None),
+ "httplib2": ("https://httplib2.readthedocs.io/en/latest/", None),
+ "requests": ("http://docs.python-requests.org/en/master/", None),
+ "urllib3": ("https://urllib3.readthedocs.io/en/latest/", None),
}
diff --git a/httpretty/compat.py b/httpretty/compat.py
index 969662c..a180593 100644
--- a/httpretty/compat.py
+++ b/httpretty/compat.py
@@ -37,18 +37,18 @@ if PY3: # pragma: no cover
else: # pragma: no cover
import StringIO
+
StringIO = StringIO.StringIO
basestring = string_types
class BaseClass(object):
-
def __repr__(self):
ret = self.__str__()
if PY3: # pragma: no cover
return ret
else:
- return ret.encode('utf-8')
+ return ret.encode("utf-8")
try: # pragma: no cover
@@ -59,25 +59,27 @@ try: # pragma: no cover
from urllib.parse import quote_plus
from urllib.parse import unquote
from urllib.parse import urlencode
+
unquote_utf8 = unquote
def encode_obj(in_obj):
return in_obj
+
+
except ImportError: # pragma: no cover
from urlparse import urlsplit, urlunsplit, parse_qs, unquote
from urllib import quote, quote_plus, urlencode
def unquote_utf8(qs):
if isinstance(qs, text_type):
- qs = qs.encode('utf-8')
+ qs = qs.encode("utf-8")
s = unquote(qs)
if isinstance(s, binary_type):
- return s.decode('utf-8', errors='ignore')
+ return s.decode("utf-8", errors="ignore")
else:
return s
def encode_obj(in_obj):
-
def encode_list(in_list):
out_list = []
for el in in_list:
@@ -91,7 +93,7 @@ except ImportError: # pragma: no cover
return out_dict
if isinstance(in_obj, unicode):
- return in_obj.encode('utf-8')
+ return in_obj.encode("utf-8")
elif isinstance(in_obj, list):
return encode_list(in_obj)
elif isinstance(in_obj, tuple):
@@ -114,17 +116,17 @@ if not PY3: # pragma: no cover
__all__ = [
- 'PY3',
- 'StringIO',
- 'text_type',
- 'binary_type',
- 'BaseClass',
- 'BaseHTTPRequestHandler',
- 'quote',
- 'quote_plus',
- 'urlencode',
- 'urlunsplit',
- 'urlsplit',
- 'parse_qs',
- 'ClassTypes',
+ "PY3",
+ "StringIO",
+ "text_type",
+ "binary_type",
+ "BaseClass",
+ "BaseHTTPRequestHandler",
+ "quote",
+ "quote_plus",
+ "urlencode",
+ "urlunsplit",
+ "urlsplit",
+ "parse_qs",
+ "ClassTypes",
]
diff --git a/httpretty/core.py b/httpretty/core.py
index c847baa..bdd411e 100644
--- a/httpretty/core.py
+++ b/httpretty/core.py
@@ -57,19 +57,11 @@ from .compat import (
parse_qs,
unquote_utf8,
ClassTypes,
- basestring
-)
-from .http import (
- STATUSES,
- HttpBaseClass,
- parse_requestline,
- last_requestline,
+ basestring,
)
+from .http import STATUSES, HttpBaseClass, parse_requestline, last_requestline
-from .utils import (
- utf8,
- decode_utf8,
-)
+from .utils import utf8, decode_utf8
from .errors import HTTPrettyError, UnmockedError
@@ -89,18 +81,20 @@ old_sslwrap_simple = None
old_sslsocket = None
old_sslcontext_wrap_socket = None
-MULTILINE_ANY_REGEX = re.compile(r'.*', re.M)
-hostname_re = re.compile(r'\^?(?:https?://)?[^:/]*[:/]?')
+MULTILINE_ANY_REGEX = re.compile(r".*", re.M)
+hostname_re = re.compile(r"\^?(?:https?://)?[^:/]*[:/]?")
try: # pragma: no cover
import socks
+
old_socksocket = socks.socksocket
except ImportError:
socks = None
try: # pragma: no cover
import ssl
+
old_ssl_wrap_socket = ssl.wrap_socket
try:
old_sslcontext_wrap_socket = ssl.SSLContext.wrap_socket
@@ -115,6 +109,7 @@ except ImportError: # pragma: no cover
try:
import requests.packages.urllib3.connection as requests_urllib3_connection
+
old_requests_ssl_wrap_socket = requests_urllib3_connection.ssl_wrap_socket
except ImportError:
requests_urllib3_connection = None
@@ -163,7 +158,8 @@ class HTTPrettyRequest(BaseHTTPRequestHandler, BaseClass):
"""
- def __init__(self, headers, body=''):
+
+ def __init__(self, headers, body=""):
# first of all, lets make sure that if headers or body are
# unicode strings, it must be converted into a utf-8 encoded
# byte string
@@ -172,7 +168,7 @@ class HTTPrettyRequest(BaseHTTPRequestHandler, BaseClass):
# Now let's concatenate the headers with the body, and create
# `rfile` based on it
- self.rfile = StringIO(b'\r\n\r\n'.join([self.raw_headers, self.body]))
+ self.rfile = StringIO(b"\r\n\r\n".join([self.raw_headers, self.body]))
# Creating `wfile` as an empty StringIO, just to avoid any
# real I/O calls
@@ -196,7 +192,7 @@ class HTTPrettyRequest(BaseHTTPRequestHandler, BaseClass):
# `querystring` holds a dictionary with the parsed query string
try:
- self.path = self.path.encode('iso-8859-1')
+ self.path = self.path.encode("iso-8859-1")
except UnicodeDecodeError:
pass
@@ -233,9 +229,7 @@ class HTTPrettyRequest(BaseHTTPRequestHandler, BaseClass):
def __str__(self):
tmpl = '<HTTPrettyRequest("{}", total_headers={}, body_length={})>'
return tmpl.format(
- self.headers.get('content-type', ''),
- len(self.headers),
- len(self.body),
+ self.headers.get("content-type", ""), len(self.headers), len(self.body)
)
def parse_querystring(self, qs):
@@ -262,12 +256,12 @@ class HTTPrettyRequest(BaseHTTPRequestHandler, BaseClass):
"""
PARSING_FUNCTIONS = {
- 'application/json': json.loads,
- 'text/json': json.loads,
- 'application/x-www-form-urlencoded': self.parse_querystring,
+ "application/json": json.loads,
+ "text/json": json.loads,
+ "application/x-www-form-urlencoded": self.parse_querystring,
}
- content_type = self.headers.get('content-type', '')
+ content_type = self.headers.get("content-type", "")
do_parse = PARSING_FUNCTIONS.get(content_type, FALLBACK_FUNCTION)
try:
@@ -290,7 +284,7 @@ class HTTPrettyRequestEmpty(object):
method = None
url = None
- body = ''
+ body = ""
headers = EmptyRequestHeaders()
@@ -299,12 +293,13 @@ class FakeSockFile(object):
a temporary file, giving it a real file descriptor number.
"""
+
def __init__(self):
self.file = tempfile.TemporaryFile()
self._fileno = self.file.fileno()
def getvalue(self):
- if hasattr(self.file, 'getvalue'):
+ if hasattr(self.file, "getvalue"):
return self.file.getvalue()
else:
return self.file.read()
@@ -323,6 +318,7 @@ class FakeSockFile(object):
class FakeSSLSocket(object):
"""Shorthand for :py:class:`~httpretty.core.fakesock`
"""
+
def __init__(self, sock, *args, **kw):
self._httpretty_sock = sock
@@ -334,18 +330,23 @@ class fakesock(object):
"""
fake :py:mod:`socket`
"""
+
class socket(object):
"""drop-in replacement for :py:class:`socket.socket`
"""
+
_entry = None
debuglevel = 0
_sent_data = []
- def __init__(self, family=socket.AF_INET, type=socket.SOCK_STREAM,
- protocol=0, _sock=None):
- self.truesock = (old_socket(family, type, protocol)
- if httpretty.allow_net_connect
- else None)
+ def __init__(
+ self, family=socket.AF_INET, type=socket.SOCK_STREAM, protocol=0, _sock=None
+ ):
+ self.truesock = (
+ old_socket(family, type, protocol)
+ if httpretty.allow_net_connect
+ else None
+ )
self._connected_truesock = False
self._closed = True
self.fd = FakeSockFile()
@@ -359,23 +360,16 @@ class fakesock(object):
now = datetime.now()
shift = now + timedelta(days=30 * 12)
return {
- 'notAfter': shift.strftime('%b %d %H:%M:%S GMT'),
- 'subjectAltName': (
- ('DNS', '*.%s' % self._host),
- ('DNS', self._host),
- ('DNS', '*'),
+ "notAfter": shift.strftime("%b %d %H:%M:%S GMT"),
+ "subjectAltName": (
+ ("DNS", "*.%s" % self._host),
+ ("DNS", self._host),
+ ("DNS", "*"),
),
- 'subject': (
- (
- ('organizationName', '*.%s' % self._host),
- ),
- (
- ('organizationalUnitName',
- 'Domain Control Validated'),
- ),
- (
- ('commonName', '*.%s' % self._host),
- ),
+ "subject": (
+ (("organizationName", "*.%s" % self._host),),
+ (("organizationalUnitName", "Domain Control Validated"),),
+ (("commonName", "*.%s" % self._host),),
),
}
@@ -398,8 +392,7 @@ class fakesock(object):
# See issue #206
self.is_http = False
else:
- ports_to_check = (
- POTENTIAL_HTTP_PORTS.union(POTENTIAL_HTTPS_PORTS))
+ ports_to_check = POTENTIAL_HTTP_PORTS.union(POTENTIAL_HTTPS_PORTS)
self.is_http = self._port in ports_to_check
if not self.is_http:
@@ -425,7 +418,7 @@ class fakesock(object):
self._connected_truesock = False
self._closed = True
- def makefile(self, mode='r', bufsize=-1):
+ def makefile(self, mode="r", bufsize=-1):
"""Returns this fake socket's own tempfile buffer.
If there is an entry associated with the socket, the file
@@ -436,9 +429,7 @@ class fakesock(object):
self._bufsize = bufsize
if self._entry:
- t = threading.Thread(
- target=self._entry.fill_filekind, args=(self.fd,)
- )
+ t = threading.Thread(target=self._entry.fill_filekind, args=(self.fd,))
t.start()
if self.timeout == socket._GLOBAL_DEFAULT_TIMEOUT:
timeout = None
@@ -495,12 +486,11 @@ class fakesock(object):
self.fd = FakeSockFile()
self.fd.socket = self
try:
- requestline, _ = data.split(b'\r\n', 1)
- method, path, version = parse_requestline(
- decode_utf8(requestline))
+ requestline, _ = data.split(b"\r\n", 1)
+ method, path, version = parse_requestline(decode_utf8(requestline))
is_parsing_headers = True
except ValueError:
- path = ''
+ path = ""
is_parsing_headers = False
if self._entry is None:
@@ -518,8 +508,12 @@ class fakesock(object):
headers = utf8(last_requestline(self._sent_data))
meta = self._entry.request.headers
body = utf8(self._sent_data[-1])
- if meta.get('transfer-encoding', '') == 'chunked':
- if not body.isdigit() and (body != b'\r\n') and (body != b'0\r\n\r\n'):
+ if meta.get("transfer-encoding", "") == "chunked":
+ if (
+ not body.isdigit()
+ and (body != b"\r\n")
+ and (body != b"0\r\n\r\n")
+ ):
self._entry.request.body += body
else:
self._entry.request.body += body
@@ -530,11 +524,11 @@ class fakesock(object):
# path might come with
s = urlsplit(path)
POTENTIAL_HTTP_PORTS.add(int(s.port or 80))
- parts = list(map(utf8, data.split(b'\r\n\r\n', 1)))
+ parts = list(map(utf8, data.split(b"\r\n\r\n", 1)))
if len(parts) == 2:
headers, body = parts
else:
- headers = ''
+ headers = ""
body = data
request = httpretty.historify_request(headers, body)
@@ -544,7 +538,7 @@ class fakesock(object):
port=self._port,
path=s.path,
query=s.query,
- last_request=request
+ last_request=request,
)
matcher, entries = httpretty.match_uriinfo(info)
@@ -563,8 +557,10 @@ class fakesock(object):
message = [
"HTTPretty intercepted and unexpected socket method call.",
- ("Please open an issue at "
- "'https://github.com/gabrielfalcao/HTTPretty/issues'"),
+ (
+ "Please open an issue at "
+ "'https://github.com/gabrielfalcao/HTTPretty/issues'"
+ ),
"And paste the following traceback:\n",
"".join(decode_utf8(lines)),
]
@@ -577,22 +573,22 @@ class fakesock(object):
self.timeout = new_timeout
def send(self, *args, **kwargs):
- return self.debug('send', *args, **kwargs)
+ return self.debug("send", *args, **kwargs)
def sendto(self, *args, **kwargs):
- return self.debug('sendto', *args, **kwargs)
+ return self.debug("sendto", *args, **kwargs)
def recvfrom_into(self, *args, **kwargs):
- return self.debug('recvfrom_into', *args, **kwargs)
+ return self.debug("recvfrom_into", *args, **kwargs)
def recv_into(self, *args, **kwargs):
- return self.debug('recv_into', *args, **kwargs)
+ return self.debug("recv_into", *args, **kwargs)
def recvfrom(self, *args, **kwargs):
- return self.debug('recvfrom', *args, **kwargs)
+ return self.debug("recvfrom", *args, **kwargs)
def recv(self, *args, **kwargs):
- return self.debug('recv', *args, **kwargs)
+ return self.debug("recv", *args, **kwargs)
def __getattr__(self, name):
if not self.truesock:
@@ -603,21 +599,20 @@ class fakesock(object):
def fake_wrap_socket(orig_wrap_socket_fn, *args, **kw):
"""drop-in replacement for py:func:`ssl.wrap_socket`
"""
- server_hostname = kw.get('server_hostname')
+ server_hostname = kw.get("server_hostname")
if server_hostname is not None:
matcher = httpretty.match_https_hostname(server_hostname)
if matcher is None:
- return orig_wrap_socket_fn(*args, **kw)
- if 'sock' in kw:
- return kw['sock']
+ return orig_wrap_socket_fn(*args, **kw)
+ if "sock" in kw:
+ return kw["sock"]
else:
return args[0]
def create_fake_connection(
- address,
- timeout=socket._GLOBAL_DEFAULT_TIMEOUT,
- source_address=None):
+ address, timeout=socket._GLOBAL_DEFAULT_TIMEOUT, source_address=None
+):
"""drop-in replacement for :py:func:`socket.create_connection`"""
s = fakesock.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP)
if timeout is not socket._GLOBAL_DEFAULT_TIMEOUT:
@@ -630,19 +625,17 @@ def create_fake_connection(
def fake_gethostbyname(host):
"""drop-in replacement for :py:func:`socket.gethostbyname`"""
- return '127.0.0.1'
+ return "127.0.0.1"
def fake_gethostname():
"""drop-in replacement for :py:func:`socket.gethostname`"""
- return 'localhost'
+ return "localhost"
-def fake_getaddrinfo(
- host, port, family=None, socktype=None, proto=None, flags=None):
+def fake_getaddrinfo(host, port, family=None, socktype=None, proto=None, flags=None):
"""drop-in replacement for :py:func:`socket.getaddrinfo`"""
- return [(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP,
- '', (host, port))]
+ return [(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP, "", (host, port))]
class Entry(BaseClass):
@@ -661,12 +654,18 @@ class Entry(BaseClass):
.. warning:: When using the ``forcing_headers`` option make sure to add the header ``Content-Length`` otherwise calls using :py:mod:`requests` will try to load the response endlessly.
"""
- def __init__(self, method, uri, body,
- adding_headers=None,
- forcing_headers=None,
- status=200,
- streaming=False,
- **headers):
+
+ def __init__(
+ self,
+ method,
+ uri,
+ body,
+ adding_headers=None,
+ forcing_headers=None,
+ status=200,
+ streaming=False,
+ **headers
+ ):
self.method = method
self.uri = uri
@@ -685,7 +684,7 @@ class Entry(BaseClass):
self.streaming = streaming
if not streaming and not self.body_is_callable:
- self.body_length = len(self.body or '')
+ self.body_length = len(self.body or "")
else:
self.body_length = 0
@@ -703,10 +702,9 @@ class Entry(BaseClass):
"""validates the body size with the value of the ``Content-Length``
header
"""
- content_length_keys = 'Content-Length', 'content-length'
+ content_length_keys = "Content-Length", "content-length"
for key in content_length_keys:
- got = self.adding_headers.get(
- key, self.forcing_headers.get(key, None))
+ got = self.adding_headers.get(key, self.forcing_headers.get(key, None))
if got is None:
continue
@@ -716,26 +714,21 @@ class Entry(BaseClass):
igot = int(got)
except (ValueError, TypeError):
warnings.warn(
- 'HTTPretty got to register the Content-Length header '
- 'with "%r" which is not a number' % got)
+ "HTTPretty got to register the Content-Length header "
+ 'with "%r" which is not a number' % got
+ )
return
if igot and igot > self.body_length:
raise HTTPrettyError(
- 'HTTPretty got inconsistent parameters. The header '
+ "HTTPretty got inconsistent parameters. The header "
'Content-Length you registered expects size "%d" but '
- 'the body you registered for that has actually length '
- '"%d".' % (
- igot, self.body_length,
- )
+ "the body you registered for that has actually length "
+ '"%d".' % (igot, self.body_length)
)
def __str__(self):
- return r'<Entry {} {} getting {}>'.format(
- self.method,
- self.uri,
- self.status
- )
+ return r"<Entry {} {} getting {}>".format(self.method, self.uri, self.status)
def normalize_headers(self, headers):
"""Normalize keys in header names so that ``COntent-tyPe`` becomes ``content-type``
@@ -746,7 +739,7 @@ class Entry(BaseClass):
"""
new = {}
for k in headers:
- new_k = '-'.join([s.lower() for s in k.split('-')])
+ new_k = "-".join([s.lower() for s in k.split("-")])
new[new_k] = headers[k]
return new
@@ -761,62 +754,54 @@ class Entry(BaseClass):
now = datetime.utcnow()
headers = {
- 'status': self.status,
- 'date': now.strftime('%a, %d %b %Y %H:%M:%S GMT'),
- 'server': 'Python/HTTPretty',
- 'connection': 'close',
+ "status": self.status,
+ "date": now.strftime("%a, %d %b %Y %H:%M:%S GMT"),
+ "server": "Python/HTTPretty",
+ "connection": "close",
}
if self.forcing_headers:
headers = self.forcing_headers
if self.adding_headers:
- headers.update(
- self.normalize_headers(
- self.adding_headers))
+ headers.update(self.normalize_headers(self.adding_headers))
headers = self.normalize_headers(headers)
- status = headers.get('status', self.status)
+ status = headers.get("status", self.status)
if self.body_is_callable:
- status, headers, self.body = self.callable_body(self.request, self.info.full_url(), headers)
+ status, headers, self.body = self.callable_body(
+ self.request, self.info.full_url(), headers
+ )
headers = self.normalize_headers(headers)
# TODO: document this behavior:
- if 'content-length' not in headers:
- headers.update({
- 'content-length': len(self.body)
- })
+ if "content-length" not in headers:
+ headers.update({"content-length": len(self.body)})
- string_list = [
- 'HTTP/1.1 %d %s' % (status, STATUSES[status]),
- ]
+ string_list = ["HTTP/1.1 %d %s" % (status, STATUSES[status])]
- if 'date' in headers:
- string_list.append('date: %s' % headers.pop('date'))
+ if "date" in headers:
+ string_list.append("date: %s" % headers.pop("date"))
if not self.forcing_headers:
- content_type = headers.pop('content-type',
- 'text/plain; charset=utf-8')
+ content_type = headers.pop("content-type", "text/plain; charset=utf-8")
- content_length = headers.pop('content-length',
- self.body_length)
+ content_length = headers.pop("content-length", self.body_length)
- string_list.append('content-type: %s' % content_type)
+ string_list.append("content-type: %s" % content_type)
if not self.streaming:
- string_list.append('content-length: %s' % content_length)
+ string_list.append("content-length: %s" % content_length)
- server = headers.pop('server', None)
+ server = headers.pop("server", None)
if server:
- string_list.append('server: %s' % server)
+ string_list.append("server: %s" % server)
for k, v in headers.items():
- string_list.append(
- '{}: {}'.format(k, v),
- )
+ string_list.append("{}: {}".format(k, v))
for item in string_list:
- fk.write(utf8(item) + b'\n')
+ fk.write(utf8(item) + b"\n")
- fk.write(b'\r\n')
+ fk.write(b"\r\n")
if self.streaming:
self.body, body = itertools.tee(self.body)
@@ -832,11 +817,14 @@ def url_fix(s, charset=None):
"""escapes special characters
"""
if charset:
- warnings.warn("{}.url_fix() charset argument is deprecated".format(__name__), DeprecationWarning)
+ warnings.warn(
+ "{}.url_fix() charset argument is deprecated".format(__name__),
+ DeprecationWarning,
+ )
scheme, netloc, path, querystring, fragment = urlsplit(s)
- path = quote(path, b'/%')
- querystring = quote_plus(querystring, b':&=')
+ path = quote(path, b"/%")
+ querystring = quote_plus(querystring, b":&=")
return urlunsplit((scheme, netloc, path, querystring, fragment))
@@ -855,67 +843,61 @@ class URIInfo(BaseClass):
:param scheme:
:param last_request:
"""
- default_str_attrs = (
- 'username',
- 'password',
- 'hostname',
- 'port',
- 'path',
- )
-
- def __init__(self,
- username='',
- password='',
- hostname='',
- port=80,
- path='/',
- query='',
- fragment='',
- scheme='',
- last_request=None):
-
- self.username = username or ''
- self.password = password or ''
- self.hostname = hostname or ''
+
+ default_str_attrs = ("username", "password", "hostname", "port", "path")
+
+ def __init__(
+ self,
+ username="",
+ password="",
+ hostname="",
+ port=80,
+ path="/",
+ query="",
+ fragment="",
+ scheme="",
+ last_request=None,
+ ):
+
+ self.username = username or ""
+ self.password = password or ""
+ self.hostname = hostname or ""
if port:
port = int(port)
- elif scheme == 'https':
+ elif scheme == "https":
port = 443
self.port = port or 80
- self.path = path or ''
+ self.path = path or ""
if query:
query_items = sorted(parse_qs(query).items())
- self.query = urlencode(
- encode_obj(query_items),
- doseq=True,
- )
+ self.query = urlencode(encode_obj(query_items), doseq=True)
else:
- self.query = ''
+ self.query = ""
if scheme:
self.scheme = scheme
elif self.port in POTENTIAL_HTTPS_PORTS:
- self.scheme = 'https'
+ self.scheme = "https"
else:
- self.scheme = 'http'
- self.fragment = fragment or ''
+ self.scheme = "http"
+ self.fragment = fragment or ""
self.last_request = last_request
def to_str(self, attrs):
- fmt = ", ".join(['%s="%s"' % (k, getattr(self, k, '')) for k in attrs])
- return r'<httpretty.URIInfo(%s)>' % fmt
+ fmt = ", ".join(['%s="%s"' % (k, getattr(self, k, "")) for k in attrs])
+ return r"<httpretty.URIInfo(%s)>" % fmt
def __str__(self):
return self.to_str(self.default_str_attrs)
def str_with_query(self):
- attrs = self.default_str_attrs + ('query',)
+ attrs = self.default_str_attrs + ("query",)
return self.to_str(attrs)
def __hash__(self):
- return int(hashlib.sha1(binary_type(self, 'ascii')).hexdigest(), 16)
+ return int(hashlib.sha1(binary_type(self, "ascii")).hexdigest(), 16)
def __eq__(self, other):
self_tuple = (
@@ -937,8 +919,7 @@ class URIInfo(BaseClass):
"""
credentials = ""
if self.password:
- credentials = "{}:{}@".format(
- self.username, self.password)
+ credentials = "{}:{}@".format(self.username, self.password)
query = ""
if use_querystring and self.query:
@@ -949,7 +930,7 @@ class URIInfo(BaseClass):
credentials=credentials,
domain=self.get_full_domain(),
path=decode_utf8(self.path),
- query=query
+ query=query,
)
return result
@@ -971,19 +952,21 @@ class URIInfo(BaseClass):
:param entry: an instance of :py:class:`~httpretty.core.Entry`
"""
result = urlsplit(uri)
- if result.scheme == 'https':
+ if result.scheme == "https":
POTENTIAL_HTTPS_PORTS.add(int(result.port or 443))
else:
POTENTIAL_HTTP_PORTS.add(int(result.port or 80))
- return cls(result.username,
- result.password,
- result.hostname,
- result.port,
- result.path,
- result.query,
- result.fragment,
- result.scheme,
- entry)
+ return cls(
+ result.username,
+ result.password,
+ result.hostname,
+ result.port,
+ result.path,
+ result.query,
+ result.fragment,
+ result.scheme,
+ entry,
+ )
class URIMatcher(object):
@@ -993,13 +976,12 @@ class URIMatcher(object):
def __init__(self, uri, entries, match_querystring=False, priority=0):
self._match_querystring = match_querystring
# CPython, Jython
- regex_types = ('SRE_Pattern', 'org.python.modules.sre.PatternObject',
- 'Pattern')
+ regex_types = ("SRE_Pattern", "org.python.modules.sre.PatternObject", "Pattern")
is_regex = type(uri).__name__ in regex_types
if is_regex:
self.regex = uri
result = urlsplit(uri.pattern)
- if result.scheme == 'https':
+ if result.scheme == "https":
POTENTIAL_HTTPS_PORTS.add(int(result.port or 443))
else:
POTENTIAL_HTTP_PORTS.add(int(result.port or 80))
@@ -1015,13 +997,16 @@ class URIMatcher(object):
def matches(self, info):
if self.info:
# Query string is not considered when comparing info objects, compare separately
- return self.info == info and (not self._match_querystring or self.info.query == info.query)
+ return self.info == info and (
+ not self._match_querystring or self.info.query == info.query
+ )
else:
- return self.regex.search(info.full_url(
- use_querystring=self._match_querystring))
+ return self.regex.search(
+ info.full_url(use_querystring=self._match_querystring)
+ )
def __str__(self):
- wrap = 'URLMatcher({})'
+ wrap = "URLMatcher({})"
if self.info:
if self._match_querystring:
return wrap.format(text_type(self.info.str_with_query()))
@@ -1045,8 +1030,7 @@ class URIMatcher(object):
self.current_entries[method] = -1
if not self.entries or not entries_for_method:
- raise ValueError('I have no entries for method %s: %s'
- % (method, self))
+ raise ValueError("I have no entries for method %s: %s" % (method, self))
entry = entries_for_method[self.current_entries[method]]
if self.current_entries[method] != -1:
@@ -1071,6 +1055,7 @@ class URIMatcher(object):
class httpretty(HttpBaseClass):
"""manages HTTPretty's internal request/response registry and request matching.
"""
+
_entries = {}
latest_requests = []
@@ -1110,11 +1095,7 @@ class httpretty(HttpBaseClass):
if matcher.info is None:
pattern_with_port = "https://{0}:".format(hostname)
pattern_without_port = "https://{0}/".format(hostname)
- hostname_pattern = (
- hostname_re
- .match(matcher.regex.pattern)
- .group(0)
- )
+ hostname_pattern = hostname_re.match(matcher.regex.pattern).group(0)
for pattern in [pattern_with_port, pattern_without_port]:
if re.match(hostname_pattern, pattern):
return matcher
@@ -1138,30 +1119,25 @@ class httpretty(HttpBaseClass):
for matcher, value in items:
if matcher.info is None:
if port in POTENTIAL_HTTPS_PORTS:
- scheme = 'https://'
+ scheme = "https://"
else:
- scheme = 'http://'
+ scheme = "http://"
pattern_without_port = "{0}{1}/".format(scheme, hostname)
pattern_with_port = "{0}{1}:{2}/".format(scheme, hostname, port)
- hostname_pattern = (
- hostname_re
- .match(matcher.regex.pattern)
- .group(0)
- )
+ hostname_pattern = hostname_re.match(matcher.regex.pattern).group(0)
for pattern in [pattern_with_port, pattern_without_port]:
if re.match(hostname_pattern, pattern):
return matcher
- elif matcher.info.hostname == hostname \
- and matcher.info.port == port:
+ elif matcher.info.hostname == hostname and matcher.info.port == port:
return matcher
return None
@classmethod
@contextlib.contextmanager
- def record(cls, filename, indentation=4, encoding='utf-8'):
+ def record(cls, filename, indentation=4, encoding="utf-8"):
"""
.. testcode::
@@ -1186,8 +1162,7 @@ class httpretty(HttpBaseClass):
import urllib3
except ImportError:
msg = (
- 'HTTPretty requires urllib3 installed '
- 'for recording actual requests.'
+ "HTTPretty requires urllib3 installed " "for recording actual requests."
)
raise RuntimeError(msg)
@@ -1200,28 +1175,30 @@ class httpretty(HttpBaseClass):
cls.disable()
kw = {}
- kw.setdefault('body', request.body)
- kw.setdefault('headers', dict(request.headers))
+ kw.setdefault("body", request.body)
+ kw.setdefault("headers", dict(request.headers))
response = http.request(request.method, uri, **kw)
- calls.append({
- 'request': {
- 'uri': uri,
- 'method': request.method,
- 'headers': dict(request.headers),
- 'body': decode_utf8(request.body),
- 'querystring': request.querystring
- },
- 'response': {
- 'status': response.status,
- 'body': decode_utf8(response.data),
- # urllib3 1.10 had a bug if you just did:
- # dict(response.headers)
- # which would cause all the values to become lists
- # with the header name as the first item and the
- # true value as the second item. Workaround that
- 'headers': dict(response.headers.items())
+ calls.append(
+ {
+ "request": {
+ "uri": uri,
+ "method": request.method,
+ "headers": dict(request.headers),
+ "body": decode_utf8(request.body),
+ "querystring": request.querystring,
+ },
+ "response": {
+ "status": response.status,
+ "body": decode_utf8(response.data),
+ # urllib3 1.10 had a bug if you just did:
+ # dict(response.headers)
+ # which would cause all the values to become lists
+ # with the header name as the first item and the
+ # true value as the second item. Workaround that
+ "headers": dict(response.headers.items()),
+ },
}
- })
+ )
cls.enable()
return response.status, response.headers, response.data
@@ -1230,7 +1207,7 @@ class httpretty(HttpBaseClass):
yield
cls.disable()
- with codecs.open(filename, 'w', encoding) as f:
+ with codecs.open(filename, "w", encoding) as f:
f.write(json.dumps(calls, indent=indentation))
@classmethod
@@ -1257,10 +1234,10 @@ class httpretty(HttpBaseClass):
data = json.loads(open(filename).read())
for item in data:
- uri = item['request']['uri']
- method = item['request']['method']
- body = item['response']['body']
- headers = item['response']['headers']
+ uri = item["request"]["uri"]
+ method = item["request"]["method"]
+ body = item["response"]["body"]
+ headers = item["response"]["headers"]
cls.register_uri(method, uri, body=body, forcing_headers=headers)
yield
@@ -1277,7 +1254,7 @@ class httpretty(HttpBaseClass):
cls.last_request = HTTPrettyRequestEmpty()
@classmethod
- def historify_request(cls, headers, body='', append=True):
+ def historify_request(cls, headers, body="", append=True):
"""appends request to a list for later retrieval
.. testcode::
@@ -1299,14 +1276,19 @@ class httpretty(HttpBaseClass):
return request
@classmethod
- def register_uri(cls, method, uri, body='{"message": "HTTPretty :)"}',
- adding_headers=None,
- forcing_headers=None,
- status=200,
- responses=None,
- match_querystring=False,
- priority=0,
- **headers):
+ def register_uri(
+ cls,
+ method,
+ uri,
+ body='{"message": "HTTPretty :)"}',
+ adding_headers=None,
+ forcing_headers=None,
+ status=200,
+ responses=None,
+ match_querystring=False,
+ priority=0,
+ **headers
+ ):
"""
.. testcode::
@@ -1343,8 +1325,8 @@ class httpretty(HttpBaseClass):
"""
uri_is_string = isinstance(uri, basestring)
- if uri_is_string and re.search(r'^\w+://[^/]+[.]\w{2,}$', uri):
- uri += '/'
+ if uri_is_string and re.search(r"^\w+://[^/]+[.]\w{2,}$", uri):
+ uri += "/"
if isinstance(responses, list) and len(responses) > 0:
for response in responses:
@@ -1352,17 +1334,14 @@ class httpretty(HttpBaseClass):
response.method = method
entries_for_this_uri = responses
else:
- headers[str('body')] = body
- headers[str('adding_headers')] = adding_headers
- headers[str('forcing_headers')] = forcing_headers
- headers[str('status')] = status
+ headers[str("body")] = body
+ headers[str("adding_headers")] = adding_headers
+ headers[str("forcing_headers")] = forcing_headers
+ headers[str("status")] = status
- entries_for_this_uri = [
- cls.Response(method=method, uri=uri, **headers),
- ]
+ entries_for_this_uri = [cls.Response(method=method, uri=uri, **headers)]
- matcher = URIMatcher(uri, entries_for_this_uri,
- match_querystring, priority)
+ matcher = URIMatcher(uri, entries_for_this_uri, match_querystring, priority)
if matcher in cls._entries:
matcher.entries.extend(cls._entries[matcher])
del cls._entries[matcher]
@@ -1370,18 +1349,20 @@ class httpretty(HttpBaseClass):
cls._entries[matcher] = entries_for_this_uri
def __str__(self):
- return '<HTTPretty with %d URI entries>' % len(self._entries)
+ return "<HTTPretty with %d URI entries>" % len(self._entries)
@classmethod
def Response(
- cls, body,
- method=None,
- uri=None,
- adding_headers=None,
- forcing_headers=None,
- status=200,
- streaming=False,
- **kw):
+ cls,
+ body,
+ method=None,
+ uri=None,
+ adding_headers=None,
+ forcing_headers=None,
+ status=200,
+ streaming=False,
+ **kw
+ ):
"""
shortcut to create an :py:class:`~httpretty.core.Entry` that takes the body as first positional argument
@@ -1396,11 +1377,11 @@ class httpretty(HttpBaseClass):
:param kw: keyword-arguments passed onto the :py:class:`~httpretty.core.Entry`
:returns: an :py:class:`~httpretty.core.Entry`
"""
- kw['body'] = body
- kw['adding_headers'] = adding_headers
- kw['forcing_headers'] = forcing_headers
- kw['status'] = int(status)
- kw['streaming'] = streaming
+ kw["body"] = body
+ kw["adding_headers"] = adding_headers
+ kw["forcing_headers"] = forcing_headers
+ kw["status"] = int(status)
+ kw["streaming"] = streaming
return Entry(method, uri, **kw)
@classmethod
@@ -1434,18 +1415,18 @@ class httpretty(HttpBaseClass):
socket.gethostbyname = old_gethostbyname
socket.getaddrinfo = old_getaddrinfo
- socket.__dict__['socket'] = old_socket
- socket.__dict__['_socketobject'] = old_socket
- socket.__dict__['SocketType'] = old_SocketType
+ socket.__dict__["socket"] = old_socket
+ socket.__dict__["_socketobject"] = old_socket
+ socket.__dict__["SocketType"] = old_SocketType
- socket.__dict__['create_connection'] = old_create_connection
- socket.__dict__['gethostname'] = old_gethostname
- socket.__dict__['gethostbyname'] = old_gethostbyname
- socket.__dict__['getaddrinfo'] = old_getaddrinfo
+ socket.__dict__["create_connection"] = old_create_connection
+ socket.__dict__["gethostname"] = old_gethostname
+ socket.__dict__["gethostbyname"] = old_gethostbyname
+ socket.__dict__["getaddrinfo"] = old_getaddrinfo
if socks:
socks.socksocket = old_socksocket
- socks.__dict__['socksocket'] = old_socksocket
+ socks.__dict__["socksocket"] = old_socksocket
if ssl:
ssl.wrap_socket = old_ssl_wrap_socket
@@ -1454,18 +1435,18 @@ class httpretty(HttpBaseClass):
ssl.SSLContext.wrap_socket = old_sslcontext_wrap_socket
except AttributeError:
pass
- ssl.__dict__['wrap_socket'] = old_ssl_wrap_socket
- ssl.__dict__['SSLSocket'] = old_sslsocket
+ ssl.__dict__["wrap_socket"] = old_ssl_wrap_socket
+ ssl.__dict__["SSLSocket"] = old_sslsocket
if not PY3:
ssl.sslwrap_simple = old_sslwrap_simple
- ssl.__dict__['sslwrap_simple'] = old_sslwrap_simple
+ ssl.__dict__["sslwrap_simple"] = old_sslwrap_simple
if requests_urllib3_connection is not None:
- requests_urllib3_connection.ssl_wrap_socket = \
- old_requests_ssl_wrap_socket
- requests_urllib3_connection.__dict__['ssl_wrap_socket'] = \
- old_requests_ssl_wrap_socket
+ requests_urllib3_connection.ssl_wrap_socket = old_requests_ssl_wrap_socket
+ requests_urllib3_connection.__dict__[
+ "ssl_wrap_socket"
+ ] = old_requests_ssl_wrap_socket
@classmethod
def is_enabled(cls):
@@ -1516,7 +1497,7 @@ class httpretty(HttpBaseClass):
cls._is_enabled = True
# Some versions of python internally shadowed the
# SocketType variable incorrectly https://bugs.python.org/issue20386
- bad_socket_shadow = (socket.socket != socket.SocketType)
+ bad_socket_shadow = socket.socket != socket.SocketType
socket.socket = fakesock.socket
socket._socketobject = fakesock.socket
@@ -1528,40 +1509,42 @@ class httpretty(HttpBaseClass):
socket.gethostbyname = fake_gethostbyname
socket.getaddrinfo = fake_getaddrinfo
- socket.__dict__['socket'] = fakesock.socket
- socket.__dict__['_socketobject'] = fakesock.socket
+ socket.__dict__["socket"] = fakesock.socket
+ socket.__dict__["_socketobject"] = fakesock.socket
if not bad_socket_shadow:
- socket.__dict__['SocketType'] = fakesock.socket
+ socket.__dict__["SocketType"] = fakesock.socket
- socket.__dict__['create_connection'] = create_fake_connection
- socket.__dict__['gethostname'] = fake_gethostname
- socket.__dict__['gethostbyname'] = fake_gethostbyname
- socket.__dict__['getaddrinfo'] = fake_getaddrinfo
+ socket.__dict__["create_connection"] = create_fake_connection
+ socket.__dict__["gethostname"] = fake_gethostname
+ socket.__dict__["gethostbyname"] = fake_gethostbyname
+ socket.__dict__["getaddrinfo"] = fake_getaddrinfo
if socks:
socks.socksocket = fakesock.socket
- socks.__dict__['socksocket'] = fakesock.socket
+ socks.__dict__["socksocket"] = fakesock.socket
if ssl:
new_wrap = partial(fake_wrap_socket, old_ssl_wrap_socket)
ssl.wrap_socket = new_wrap
ssl.SSLSocket = FakeSSLSocket
try:
- ssl.SSLContext.wrap_socket = partial(fake_wrap_socket, old_sslcontext_wrap_socket)
+ ssl.SSLContext.wrap_socket = partial(
+ fake_wrap_socket, old_sslcontext_wrap_socket
+ )
except AttributeError:
pass
- ssl.__dict__['wrap_socket'] = new_wrap
- ssl.__dict__['SSLSocket'] = FakeSSLSocket
+ ssl.__dict__["wrap_socket"] = new_wrap
+ ssl.__dict__["SSLSocket"] = FakeSSLSocket
if not PY3:
ssl.sslwrap_simple = new_wrap
- ssl.__dict__['sslwrap_simple'] = new_wrap
+ ssl.__dict__["sslwrap_simple"] = new_wrap
if requests_urllib3_connection is not None:
new_wrap = partial(fake_wrap_socket, old_requests_ssl_wrap_socket)
requests_urllib3_connection.ssl_wrap_socket = new_wrap
- requests_urllib3_connection.__dict__['ssl_wrap_socket'] = new_wrap
+ requests_urllib3_connection.__dict__["ssl_wrap_socket"] = new_wrap
class httprettized(object):
@@ -1579,6 +1562,7 @@ class httprettized(object):
assert httpretty.latest_requests[-1].url == 'https://httpbin.org/ip'
assert response.json() == {'origin': '42.42.42.42'}
"""
+
def __init__(self, allow_net_connect=True):
self.allow_net_connect = allow_net_connect
@@ -1637,15 +1621,14 @@ def httprettified(test=None, allow_net_connect=True):
})
"""
+
def decorate_unittest_TestCase_setUp(klass):
# Prefer addCleanup (added in python 2.7), but fall back
# to using tearDown if it isn't available
- use_addCleanup = hasattr(klass, 'addCleanup')
+ use_addCleanup = hasattr(klass, "addCleanup")
- original_setUp = (klass.setUp
- if hasattr(klass, 'setUp')
- else None)
+ original_setUp = klass.setUp if hasattr(klass, "setUp") else None
def new_setUp(self):
httpretty.reset()
@@ -1654,25 +1637,25 @@ def httprettified(test=None, allow_net_connect=True):
self.addCleanup(httpretty.disable)
if original_setUp:
original_setUp(self)
+
klass.setUp = new_setUp
if not use_addCleanup:
- original_tearDown = (klass.setUp
- if hasattr(klass, 'tearDown')
- else None)
+ original_tearDown = klass.setUp if hasattr(klass, "tearDown") else None
def new_tearDown(self):
httpretty.disable()
httpretty.reset()
if original_tearDown:
original_tearDown(self)
+
klass.tearDown = new_tearDown
return klass
def decorate_test_methods(klass):
for attr in dir(klass):
- if not attr.startswith('test_'):
+ if not attr.startswith("test_"):
continue
attr_value = getattr(klass, attr)
@@ -1685,11 +1668,13 @@ def httprettified(test=None, allow_net_connect=True):
def is_unittest_TestCase(klass):
try:
import unittest
+
return issubclass(klass, unittest.TestCase)
except ImportError:
return False
"A decorator for tests that use HTTPretty"
+
def decorate_class(klass):
if is_unittest_TestCase(klass):
return decorate_unittest_TestCase_setUp(klass)
@@ -1700,6 +1685,7 @@ def httprettified(test=None, allow_net_connect=True):
def wrapper(*args, **kw):
with httprettized(allow_net_connect):
return test(*args, **kw)
+
return wrapper
if isinstance(test, ClassTypes):
diff --git a/httpretty/errors.py b/httpretty/errors.py
index c69a438..1f1b94f 100644
--- a/httpretty/errors.py
+++ b/httpretty/errors.py
@@ -34,6 +34,6 @@ class HTTPrettyError(Exception):
class UnmockedError(HTTPrettyError):
def __init__(self):
super(UnmockedError, self).__init__(
- 'No mocking was registered, and real connections are '
- 'not allowed (httpretty.allow_net_connect = False).'
+ "No mocking was registered, and real connections are "
+ "not allowed (httpretty.allow_net_connect = False)."
)
diff --git a/httpretty/http.py b/httpretty/http.py
index 79dac73..7e323ac 100644
--- a/httpretty/http.py
+++ b/httpretty/http.py
@@ -108,14 +108,14 @@ STATUSES = {
class HttpBaseClass(BaseClass):
- GET = 'GET'
- PUT = 'PUT'
- POST = 'POST'
- DELETE = 'DELETE'
- HEAD = 'HEAD'
- PATCH = 'PATCH'
- OPTIONS = 'OPTIONS'
- CONNECT = 'CONNECT'
+ GET = "GET"
+ PUT = "PUT"
+ POST = "POST"
+ DELETE = "DELETE"
+ HEAD = "HEAD"
+ PATCH = "PATCH"
+ OPTIONS = "OPTIONS"
+ CONNECT = "CONNECT"
METHODS = (GET, PUT, POST, DELETE, HEAD, PATCH, OPTIONS, CONNECT)
@@ -132,12 +132,12 @@ def parse_requestline(s):
...
ValueError: Not a Request-Line
"""
- methods = '|'.join(HttpBaseClass.METHODS)
- m = re.match(r'(' + methods + r')\s+(.*)\s+HTTP/(1.[0|1])', s, re.I)
+ methods = "|".join(HttpBaseClass.METHODS)
+ m = re.match(r"(" + methods + r")\s+(.*)\s+HTTP/(1.[0|1])", s, re.I)
if m:
return m.group(1).upper(), m.group(2), m.group(3)
else:
- raise ValueError('Not a Request-Line')
+ raise ValueError("Not a Request-Line")
def last_requestline(sent_data):
diff --git a/httpretty/utils.py b/httpretty/utils.py
index c203796..5c0fa13 100644
--- a/httpretty/utils.py
+++ b/httpretty/utils.py
@@ -25,14 +25,12 @@
# OTHER DEALINGS IN THE SOFTWARE.
from __future__ import unicode_literals
-from .compat import (
- binary_type, text_type
-)
+from .compat import binary_type, text_type
def utf8(s):
if isinstance(s, text_type):
- s = s.encode('utf-8')
+ s = s.encode("utf-8")
elif s is None:
return binary_type()
diff --git a/httpretty/version.py b/httpretty/version.py
index cd558f2..71353ea 100644
--- a/httpretty/version.py
+++ b/httpretty/version.py
@@ -1 +1 @@
-version = '0.9.5'
+version = "0.9.5"
diff --git a/setup.py b/setup.py
index 53c1955..a2a83e6 100644
--- a/setup.py
+++ b/setup.py
@@ -31,44 +31,44 @@ from setuptools import setup, find_packages
def read_version():
ctx = {}
- exec(local_file('httpretty', 'version.py'), ctx)
- return ctx['version']
+ exec(local_file("httpretty", "version.py"), ctx)
+ return ctx["version"]
-local_file = lambda *f: \
- io.open(
- os.path.join(os.path.dirname(__file__), *f), encoding='utf-8').read()
+local_file = lambda *f: io.open(
+ os.path.join(os.path.dirname(__file__), *f), encoding="utf-8"
+).read()
-install_requires = ['six']
-tests_requires = ['nose', 'sure', 'coverage', 'mock', 'rednose']
+install_requires = ["six"]
+tests_requires = ["nose", "sure", "coverage", "mock", "rednose"]
setup(
- name='httpretty',
+ name="httpretty",
version=read_version(),
- description='HTTP client mock for Python',
- long_description=local_file('README.rst'),
- author='Gabriel Falcao',
- author_email='gabriel@nacaolivre.org',
- url='https://httpretty.readthedocs.io',
+ description="HTTP client mock for Python",
+ long_description=local_file("README.rst"),
+ author="Gabriel Falcao",
+ author_email="gabriel@nacaolivre.org",
+ url="https://httpretty.readthedocs.io",
zip_safe=False,
- packages=find_packages(exclude=['*tests*']),
+ packages=find_packages(exclude=["*tests*"]),
tests_require=tests_requires,
install_requires=install_requires,
- license='MIT',
- test_suite='nose.collector',
+ license="MIT",
+ test_suite="nose.collector",
classifiers=[
- 'Development Status :: 5 - Production/Stable',
- 'Intended Audience :: Developers',
- 'License :: OSI Approved :: MIT License',
- 'Programming Language :: Python :: 2',
- 'Programming Language :: Python :: 2.7',
- 'Programming Language :: Python :: 3',
- 'Programming Language :: Python :: 3.6',
- 'Programming Language :: Python :: 3.7',
- 'Programming Language :: Python',
- 'Topic :: Internet :: WWW/HTTP',
- 'Topic :: Software Development :: Testing'
+ "Development Status :: 5 - Production/Stable",
+ "Intended Audience :: Developers",
+ "License :: OSI Approved :: MIT License",
+ "Programming Language :: Python :: 2",
+ "Programming Language :: Python :: 2.7",
+ "Programming Language :: Python :: 3",
+ "Programming Language :: Python :: 3.6",
+ "Programming Language :: Python :: 3.7",
+ "Programming Language :: Python",
+ "Topic :: Internet :: WWW/HTTP",
+ "Topic :: Software Development :: Testing",
],
)
diff --git a/tests/functional/__init__.py b/tests/functional/__init__.py
index 4eaed8c..ad1391a 100644
--- a/tests/functional/__init__.py
+++ b/tests/functional/__init__.py
@@ -25,4 +25,5 @@
# OTHER DEALINGS IN THE SOFTWARE.
import warnings
-warnings.simplefilter('ignore')
+
+warnings.simplefilter("ignore")
diff --git a/tests/functional/base.py b/tests/functional/base.py
index 61bd89f..884c483 100644
--- a/tests/functional/base.py
+++ b/tests/functional/base.py
@@ -44,28 +44,33 @@ def get_free_tcp_port():
"""returns a TCP port that can be used for listen in the host.
"""
tcp = old_socket(socket.AF_INET, socket.SOCK_STREAM)
- tcp.bind(('', 0))
+ tcp.bind(("", 0))
host, port = tcp.getsockname()
tcp.close()
return port
LOCAL_FILE = lambda *path: join(abspath(dirname(__file__)), *path)
-FIXTURE_FILE = lambda name: LOCAL_FILE('fixtures', name)
+FIXTURE_FILE = lambda name: LOCAL_FILE("fixtures", name)
class JSONEchoHandler(tornado.web.RequestHandler):
def get(self, matched):
payload = dict([(x, self.get_argument(x)) for x in self.request.arguments])
- self.write(json.dumps({matched or 'index': payload}, indent=4))
+ self.write(json.dumps({matched or "index": payload}, indent=4))
def post(self, matched):
payload = dict(self.request.arguments)
- self.write(json.dumps({
- matched or 'index': payload,
- 'req_body': self.request.body.decode('utf-8'),
- 'req_headers': dict(self.request.headers.items()),
- }, indent=4))
+ self.write(
+ json.dumps(
+ {
+ matched or "index": payload,
+ "req_body": self.request.body.decode("utf-8"),
+ "req_headers": dict(self.request.headers.items()),
+ },
+ indent=4,
+ )
+ )
class JSONEchoServer(threading.Thread):
@@ -83,9 +88,7 @@ class JSONEchoServer(threading.Thread):
return self._stop.isSet()
def setup_application(self):
- return tornado.web.Application([
- (r"/(.*)", JSONEchoHandler),
- ])
+ return tornado.web.Application([(r"/(.*)", JSONEchoHandler)])
def run(self):
loop = tornado.ioloop.IOLoop()
@@ -102,9 +105,9 @@ def use_tornado_server(callback):
@wraps(callback)
def func(*args, **kw):
- port = os.getenv('TEST_PORT', get_free_tcp_port())
+ port = os.getenv("TEST_PORT", get_free_tcp_port())
POTENTIAL_HTTP_PORTS.add(port)
- kw['port'] = port
+ kw["port"] = port
server = JSONEchoServer(lock, port)
server.start()
try:
@@ -115,4 +118,5 @@ def use_tornado_server(callback):
server.stop()
if port in POTENTIAL_HTTP_PORTS:
POTENTIAL_HTTP_PORTS.remove(port)
+
return func
diff --git a/tests/functional/test_bypass.py b/tests/functional/test_bypass.py
index 6446fe9..bd31454 100644
--- a/tests/functional/test_bypass.py
+++ b/tests/functional/test_bypass.py
@@ -27,6 +27,7 @@
from __future__ import unicode_literals
import time
import requests
+
try:
import urllib.request as urllib2
except ImportError:
@@ -58,7 +59,7 @@ def start_http_server(context):
httpretty.disable()
time.sleep(.1)
try:
- requests.get('http://localhost:{}/'.format(context.http_port))
+ requests.get("http://localhost:{}/".format(context.http_port))
ready = True
except (Exception, BaseException):
if time.time() - started_at >= timeout:
@@ -92,31 +93,38 @@ def test_httpretty_bypasses_when_disabled(context):
"httpretty should bypass all requests by disabling it"
httpretty.register_uri(
- httpretty.GET, "http://localhost:{}/go-for-bubbles/".format(context.http_port),
- body="glub glub")
+ httpretty.GET,
+ "http://localhost:{}/go-for-bubbles/".format(context.http_port),
+ body="glub glub",
+ )
httpretty.disable()
- fd = urllib2.urlopen('http://localhost:{}/go-for-bubbles/'.format(context.http_port))
+ fd = urllib2.urlopen(
+ "http://localhost:{}/go-for-bubbles/".format(context.http_port)
+ )
got1 = fd.read()
fd.close()
expect(got1).to.equal(
- b'. o O 0 O o . o O 0 O o . o O 0 O o . o O 0 O o . o O 0 O o .')
+ b". o O 0 O o . o O 0 O o . o O 0 O o . o O 0 O o . o O 0 O o ."
+ )
- fd = urllib2.urlopen('http://localhost:{}/come-again/'.format(context.http_port))
+ fd = urllib2.urlopen("http://localhost:{}/come-again/".format(context.http_port))
got2 = fd.read()
fd.close()
- expect(got2).to.equal(b'<- HELLO WORLD ->')
+ expect(got2).to.equal(b"<- HELLO WORLD ->")
httpretty.enable()
- fd = urllib2.urlopen('http://localhost:{}/go-for-bubbles/'.format(context.http_port))
+ fd = urllib2.urlopen(
+ "http://localhost:{}/go-for-bubbles/".format(context.http_port)
+ )
got3 = fd.read()
fd.close()
- expect(got3).to.equal(b'glub glub')
+ expect(got3).to.equal(b"glub glub")
core.POTENTIAL_HTTP_PORTS.remove(context.http_port)
@@ -126,20 +134,24 @@ def test_httpretty_bypasses_a_unregistered_request(context):
"httpretty should bypass a unregistered request by disabling it"
httpretty.register_uri(
- httpretty.GET, "http://localhost:{}/go-for-bubbles/".format(context.http_port),
- body="glub glub")
-
- fd = urllib2.urlopen('http://localhost:{}/go-for-bubbles/'.format(context.http_port))
+ httpretty.GET,
+ "http://localhost:{}/go-for-bubbles/".format(context.http_port),
+ body="glub glub",
+ )
+
+ fd = urllib2.urlopen(
+ "http://localhost:{}/go-for-bubbles/".format(context.http_port)
+ )
got1 = fd.read()
fd.close()
- expect(got1).to.equal(b'glub glub')
+ expect(got1).to.equal(b"glub glub")
- fd = urllib2.urlopen('http://localhost:{}/come-again/'.format(context.http_port))
+ fd = urllib2.urlopen("http://localhost:{}/come-again/".format(context.http_port))
got2 = fd.read()
fd.close()
- expect(got2).to.equal(b'<- HELLO WORLD ->')
+ expect(got2).to.equal(b"<- HELLO WORLD ->")
core.POTENTIAL_HTTP_PORTS.remove(context.http_port)
@@ -148,15 +160,13 @@ def test_httpretty_bypasses_a_unregistered_request(context):
def test_using_httpretty_with_other_tcp_protocols(context):
"httpretty should work even when testing code that also use other TCP-based protocols"
- httpretty.register_uri(
- httpretty.GET, "http://falcao.it/foo/",
- body="BAR")
+ httpretty.register_uri(httpretty.GET, "http://falcao.it/foo/", body="BAR")
- fd = urllib2.urlopen('http://falcao.it/foo/')
+ fd = urllib2.urlopen("http://falcao.it/foo/")
got1 = fd.read()
fd.close()
- expect(got1).to.equal(b'BAR')
+ expect(got1).to.equal(b"BAR")
expect(context.client.send("foobar")).to.equal(b"RECEIVED: foobar")
@@ -168,13 +178,14 @@ def test_disallow_net_connect_1(context):
When allow_net_connect = False, a request that otherwise
would have worked results in UnmockedError.
"""
- httpretty.register_uri(httpretty.GET, "http://falcao.it/foo/",
- body="BAR")
+ httpretty.register_uri(httpretty.GET, "http://falcao.it/foo/", body="BAR")
def foo():
fd = None
try:
- fd = urllib2.urlopen('http://localhost:{}/go-for-bubbles/'.format(context.http_port))
+ fd = urllib2.urlopen(
+ "http://localhost:{}/go-for-bubbles/".format(context.http_port)
+ )
finally:
if fd:
fd.close()
@@ -192,7 +203,7 @@ def test_disallow_net_connect_2():
def foo():
fd = None
try:
- fd = urllib2.urlopen('http://example.com/nonsense')
+ fd = urllib2.urlopen("http://example.com/nonsense")
finally:
if fd:
fd.close()
@@ -204,9 +215,8 @@ def test_disallow_net_connect_2():
def test_disallow_net_connect_3():
"When allow_net_connect = False, mocked requests still work correctly."
- httpretty.register_uri(httpretty.GET, "http://falcao.it/foo/",
- body="BAR")
- fd = urllib2.urlopen('http://falcao.it/foo/')
+ httpretty.register_uri(httpretty.GET, "http://falcao.it/foo/", body="BAR")
+ fd = urllib2.urlopen("http://falcao.it/foo/")
got1 = fd.read()
fd.close()
- expect(got1).to.equal(b'BAR')
+ expect(got1).to.equal(b"BAR")
diff --git a/tests/functional/test_debug.py b/tests/functional/test_debug.py
index 995da8d..577038e 100644
--- a/tests/functional/test_debug.py
+++ b/tests/functional/test_debug.py
@@ -31,11 +31,7 @@ from httpretty import httprettified
def create_socket(context):
- context.sock = socket.socket(
- socket.AF_INET,
- socket.SOCK_STREAM,
- socket.IPPROTO_TCP,
- )
+ context.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP)
context.sock.is_http = True
@@ -45,8 +41,7 @@ def test_httpretty_debugs_socket_send(context):
"HTTPretty should debug socket.send"
expect(context.sock.send).when.called.to.throw(
- RuntimeError,
- "HTTPretty intercepted and unexpected socket method call."
+ RuntimeError, "HTTPretty intercepted and unexpected socket method call."
)
@@ -56,8 +51,7 @@ def test_httpretty_debugs_socket_sendto(context):
"HTTPretty should debug socket.sendto"
expect(context.sock.sendto).when.called.to.throw(
- RuntimeError,
- "HTTPretty intercepted and unexpected socket method call."
+ RuntimeError, "HTTPretty intercepted and unexpected socket method call."
)
@@ -67,8 +61,7 @@ def test_httpretty_debugs_socket_recv(context):
"HTTPretty should debug socket.recv"
expect(context.sock.recv).when.called.to.throw(
- RuntimeError,
- "HTTPretty intercepted and unexpected socket method call."
+ RuntimeError, "HTTPretty intercepted and unexpected socket method call."
)
@@ -78,8 +71,7 @@ def test_httpretty_debugs_socket_recvfrom(context):
"HTTPretty should debug socket.recvfrom"
expect(context.sock.recvfrom).when.called.to.throw(
- RuntimeError,
- "HTTPretty intercepted and unexpected socket method call."
+ RuntimeError, "HTTPretty intercepted and unexpected socket method call."
)
@@ -89,8 +81,7 @@ def test_httpretty_debugs_socket_recv_into(context):
"HTTPretty should debug socket.recv_into"
expect(context.sock.recv_into).when.called.to.throw(
- RuntimeError,
- "HTTPretty intercepted and unexpected socket method call."
+ RuntimeError, "HTTPretty intercepted and unexpected socket method call."
)
@@ -100,6 +91,5 @@ def test_httpretty_debugs_socket_recvfrom_into(context):
"HTTPretty should debug socket.recvfrom_into"
expect(context.sock.recvfrom_into).when.called.to.throw(
- RuntimeError,
- "HTTPretty intercepted and unexpected socket method call."
+ RuntimeError, "HTTPretty intercepted and unexpected socket method call."
)
diff --git a/tests/functional/test_decorator.py b/tests/functional/test_decorator.py
index d1f156a..5834d33 100644
--- a/tests/functional/test_decorator.py
+++ b/tests/functional/test_decorator.py
@@ -11,34 +11,30 @@ except ImportError:
@httprettified
def test_decor():
- HTTPretty.register_uri(
- HTTPretty.GET, "http://localhost/",
- body="glub glub")
+ HTTPretty.register_uri(HTTPretty.GET, "http://localhost/", body="glub glub")
- fd = urllib2.urlopen('http://localhost/')
+ fd = urllib2.urlopen("http://localhost/")
got1 = fd.read()
fd.close()
- expect(got1).to.equal(b'glub glub')
+ expect(got1).to.equal(b"glub glub")
@httprettified
class DecoratedNonUnitTest(object):
-
def test_fail(self):
- raise AssertionError('Tests in this class should not '
- 'be executed by the test runner.')
+ raise AssertionError(
+ "Tests in this class should not " "be executed by the test runner."
+ )
def test_decorated(self):
- HTTPretty.register_uri(
- HTTPretty.GET, "http://localhost/",
- body="glub glub")
+ HTTPretty.register_uri(HTTPretty.GET, "http://localhost/", body="glub glub")
- fd = urllib2.urlopen('http://localhost/')
+ fd = urllib2.urlopen("http://localhost/")
got1 = fd.read()
fd.close()
- expect(got1).to.equal(b'glub glub')
+ expect(got1).to.equal(b"glub glub")
class NonUnitTestTest(TestCase):
@@ -52,65 +48,61 @@ class NonUnitTestTest(TestCase):
@httprettified
class ClassDecorator(TestCase):
-
def test_decorated(self):
- HTTPretty.register_uri(
- HTTPretty.GET, "http://localhost/",
- body="glub glub")
+ HTTPretty.register_uri(HTTPretty.GET, "http://localhost/", body="glub glub")
- fd = urllib2.urlopen('http://localhost/')
+ fd = urllib2.urlopen("http://localhost/")
got1 = fd.read()
fd.close()
- expect(got1).to.equal(b'glub glub')
+ expect(got1).to.equal(b"glub glub")
def test_decorated2(self):
- HTTPretty.register_uri(
- HTTPretty.GET, "http://localhost/",
- body="buble buble")
+ HTTPretty.register_uri(HTTPretty.GET, "http://localhost/", body="buble buble")
- fd = urllib2.urlopen('http://localhost/')
+ fd = urllib2.urlopen("http://localhost/")
got1 = fd.read()
fd.close()
- expect(got1).to.equal(b'buble buble')
+ expect(got1).to.equal(b"buble buble")
@httprettified
class ClassDecoratorWithSetUp(TestCase):
-
def setUp(self):
HTTPretty.register_uri(
- HTTPretty.GET, "http://localhost/",
+ HTTPretty.GET,
+ "http://localhost/",
responses=[
HTTPretty.Response("glub glub"),
HTTPretty.Response("buble buble"),
- ])
+ ],
+ )
def test_decorated(self):
- fd = urllib2.urlopen('http://localhost/')
+ fd = urllib2.urlopen("http://localhost/")
got1 = fd.read()
fd.close()
- expect(got1).to.equal(b'glub glub')
+ expect(got1).to.equal(b"glub glub")
- fd = urllib2.urlopen('http://localhost/')
+ fd = urllib2.urlopen("http://localhost/")
got2 = fd.read()
fd.close()
- expect(got2).to.equal(b'buble buble')
+ expect(got2).to.equal(b"buble buble")
def test_decorated2(self):
- fd = urllib2.urlopen('http://localhost/')
+ fd = urllib2.urlopen("http://localhost/")
got1 = fd.read()
fd.close()
- expect(got1).to.equal(b'glub glub')
+ expect(got1).to.equal(b"glub glub")
- fd = urllib2.urlopen('http://localhost/')
+ fd = urllib2.urlopen("http://localhost/")
got2 = fd.read()
fd.close()
- expect(got2).to.equal(b'buble buble')
+ expect(got2).to.equal(b"buble buble")
diff --git a/tests/functional/test_fakesocket.py b/tests/functional/test_fakesocket.py
index 215d074..edacd77 100644
--- a/tests/functional/test_fakesocket.py
+++ b/tests/functional/test_fakesocket.py
@@ -36,6 +36,7 @@ class FakeSocket(socket.socket):
Just an editable socket factory
It allows mock to patch readonly functions
"""
+
connect = sendall = lambda *args, **kw: None
@@ -50,26 +51,24 @@ def recv(flag, size):
the asked size passed in argument.
Any further call will just raise RuntimeError
"""
- if 'was_here' in flag:
- raise RuntimeError('Already sent everything')
+ if "was_here" in flag:
+ raise RuntimeError("Already sent everything")
else:
- flag['was_here'] = None
- return 'a' * (size - 1)
+ flag["was_here"] = None
+ return "a" * (size - 1)
recv = functools.partial(recv, fake_socket_interupter_flag)
-@mock.patch('httpretty.old_socket', new=FakeSocket)
+@mock.patch("httpretty.old_socket", new=FakeSocket)
def _test_shorten_response():
u"HTTPretty shouldn't try to read from server when communication is over"
from sure import expect
import httpretty
- fakesocket = httpretty.fakesock.socket(socket.AF_INET,
- socket.SOCK_STREAM)
- with mock.patch.object(fakesocket.truesock, 'recv', recv):
- fakesocket.connect(('localhost', 80))
- fakesocket._true_sendall('WHATEVER')
- expect(fakesocket.fd.read()).to.equal(
- 'a' * (httpretty.socket_buffer_size - 1))
+ fakesocket = httpretty.fakesock.socket(socket.AF_INET, socket.SOCK_STREAM)
+ with mock.patch.object(fakesocket.truesock, "recv", recv):
+ fakesocket.connect(("localhost", 80))
+ fakesocket._true_sendall("WHATEVER")
+ expect(fakesocket.fd.read()).to.equal("a" * (httpretty.socket_buffer_size - 1))
diff --git a/tests/functional/test_httplib2.py b/tests/functional/test_httplib2.py
index 9aa6a5b..8d4f947 100644
--- a/tests/functional/test_httplib2.py
+++ b/tests/functional/test_httplib2.py
@@ -38,14 +38,15 @@ from httpretty.core import decode_utf8
def test_httpretty_should_mock_a_simple_get_with_httplib2_read(now):
"HTTPretty should mock a simple GET with httplib2.context.http"
- HTTPretty.register_uri(HTTPretty.GET, "http://yipit.com/",
- body="Find the best daily deals")
+ HTTPretty.register_uri(
+ HTTPretty.GET, "http://yipit.com/", body="Find the best daily deals"
+ )
- _, got = httplib2.Http().request('http://yipit.com', 'GET')
- expect(got).to.equal(b'Find the best daily deals')
+ _, got = httplib2.Http().request("http://yipit.com", "GET")
+ expect(got).to.equal(b"Find the best daily deals")
- expect(HTTPretty.last_request.method).to.equal('GET')
- expect(HTTPretty.last_request.path).to.equal('/')
+ expect(HTTPretty.last_request.method).to.equal("GET")
+ expect(HTTPretty.last_request.path).to.equal("/")
@httprettified
@@ -53,14 +54,14 @@ def test_httpretty_should_mock_a_simple_get_with_httplib2_read(now):
def test_httpretty_provides_easy_access_to_querystrings(now):
"HTTPretty should provide an easy access to the querystring"
- HTTPretty.register_uri(HTTPretty.GET, "http://yipit.com/",
- body="Find the best daily deals")
+ HTTPretty.register_uri(
+ HTTPretty.GET, "http://yipit.com/", body="Find the best daily deals"
+ )
- httplib2.Http().request('http://yipit.com?foo=bar&foo=baz&chuck=norris', 'GET')
- expect(HTTPretty.last_request.querystring).to.equal({
- 'foo': ['bar', 'baz'],
- 'chuck': ['norris'],
- })
+ httplib2.Http().request("http://yipit.com?foo=bar&foo=baz&chuck=norris", "GET")
+ expect(HTTPretty.last_request.querystring).to.equal(
+ {"foo": ["bar", "baz"], "chuck": ["norris"]}
+ )
@httprettified
@@ -68,20 +69,25 @@ def test_httpretty_provides_easy_access_to_querystrings(now):
def test_httpretty_should_mock_headers_httplib2(now):
"HTTPretty should mock basic headers with httplib2"
- HTTPretty.register_uri(HTTPretty.GET, "http://github.com/",
- body="this is supposed to be the response",
- status=201)
+ HTTPretty.register_uri(
+ HTTPretty.GET,
+ "http://github.com/",
+ body="this is supposed to be the response",
+ status=201,
+ )
- headers, _ = httplib2.Http().request('http://github.com', 'GET')
- expect(headers['status']).to.equal('201')
- expect(dict(headers)).to.equal({
- 'content-type': 'text/plain; charset=utf-8',
- 'connection': 'close',
- 'content-length': '35',
- 'status': '201',
- 'server': 'Python/HTTPretty',
- 'date': now.strftime('%a, %d %b %Y %H:%M:%S GMT'),
- })
+ headers, _ = httplib2.Http().request("http://github.com", "GET")
+ expect(headers["status"]).to.equal("201")
+ expect(dict(headers)).to.equal(
+ {
+ "content-type": "text/plain; charset=utf-8",
+ "connection": "close",
+ "content-length": "35",
+ "status": "201",
+ "server": "Python/HTTPretty",
+ "date": now.strftime("%a, %d %b %Y %H:%M:%S GMT"),
+ }
+ )
@httprettified
@@ -89,25 +95,30 @@ def test_httpretty_should_mock_headers_httplib2(now):
def test_httpretty_should_allow_adding_and_overwritting_httplib2(now):
"HTTPretty should allow adding and overwritting headers with httplib2"
- HTTPretty.register_uri(HTTPretty.GET, "http://github.com/foo",
- body="this is supposed to be the response",
- adding_headers={
- 'Server': 'Apache',
- 'Content-Length': '27',
- 'Content-Type': 'application/json',
- })
-
- headers, _ = httplib2.Http().request('http://github.com/foo', 'GET')
+ HTTPretty.register_uri(
+ HTTPretty.GET,
+ "http://github.com/foo",
+ body="this is supposed to be the response",
+ adding_headers={
+ "Server": "Apache",
+ "Content-Length": "27",
+ "Content-Type": "application/json",
+ },
+ )
- expect(dict(headers)).to.equal({
- 'content-type': 'application/json',
- 'content-location': 'http://github.com/foo',
- 'connection': 'close',
- 'content-length': '27',
- 'status': '200',
- 'server': 'Apache',
- 'date': now.strftime('%a, %d %b %Y %H:%M:%S GMT'),
- })
+ headers, _ = httplib2.Http().request("http://github.com/foo", "GET")
+
+ expect(dict(headers)).to.equal(
+ {
+ "content-type": "application/json",
+ "content-location": "http://github.com/foo",
+ "connection": "close",
+ "content-length": "27",
+ "status": "200",
+ "server": "Apache",
+ "date": now.strftime("%a, %d %b %Y %H:%M:%S GMT"),
+ }
+ )
@httprettified
@@ -115,52 +126,59 @@ def test_httpretty_should_allow_adding_and_overwritting_httplib2(now):
def test_httpretty_should_allow_forcing_headers_httplib2(now):
"HTTPretty should allow forcing headers with httplib2"
- HTTPretty.register_uri(HTTPretty.GET, "http://github.com/foo",
- body="this is supposed to be the response",
- forcing_headers={
- 'Content-Type': 'application/xml',
- })
-
- headers, _ = httplib2.Http().request('http://github.com/foo', 'GET')
+ HTTPretty.register_uri(
+ HTTPretty.GET,
+ "http://github.com/foo",
+ body="this is supposed to be the response",
+ forcing_headers={"Content-Type": "application/xml"},
+ )
- expect(dict(headers)).to.equal({
- 'content-location': 'http://github.com/foo', # httplib2 FORCES
- # content-location
- # even if the
- # server does not
- # provide it
- 'content-type': 'application/xml',
- 'status': '200', # httplib2 also ALWAYS put status on headers
- })
+ headers, _ = httplib2.Http().request("http://github.com/foo", "GET")
+
+ expect(dict(headers)).to.equal(
+ {
+ "content-location": "http://github.com/foo", # httplib2 FORCES
+ # content-location
+ # even if the
+ # server does not
+ # provide it
+ "content-type": "application/xml",
+ "status": "200", # httplib2 also ALWAYS put status on headers
+ }
+ )
@httprettified
@within(two=microseconds)
def test_httpretty_should_allow_adding_and_overwritting_by_kwargs_u2(now):
- "HTTPretty should allow adding and overwritting headers by keyword args " \
- "with httplib2"
-
- HTTPretty.register_uri(HTTPretty.GET, "http://github.com/foo",
- body="this is supposed to be the response",
- server='Apache',
- content_length='27',
- content_type='application/json')
-
- headers, _ = httplib2.Http().request('http://github.com/foo', 'GET')
-
- expect(dict(headers)).to.equal({
- 'content-type': 'application/json',
- 'content-location': 'http://github.com/foo', # httplib2 FORCES
- # content-location
- # even if the
- # server does not
- # provide it
- 'connection': 'close',
- 'content-length': '27',
- 'status': '200',
- 'server': 'Apache',
- 'date': now.strftime('%a, %d %b %Y %H:%M:%S GMT'),
- })
+ "HTTPretty should allow adding and overwritting headers by keyword args " "with httplib2"
+
+ HTTPretty.register_uri(
+ HTTPretty.GET,
+ "http://github.com/foo",
+ body="this is supposed to be the response",
+ server="Apache",
+ content_length="27",
+ content_type="application/json",
+ )
+
+ headers, _ = httplib2.Http().request("http://github.com/foo", "GET")
+
+ expect(dict(headers)).to.equal(
+ {
+ "content-type": "application/json",
+ "content-location": "http://github.com/foo", # httplib2 FORCES
+ # content-location
+ # even if the
+ # server does not
+ # provide it
+ "connection": "close",
+ "content-length": "27",
+ "status": "200",
+ "server": "Apache",
+ "date": now.strftime("%a, %d %b %Y %H:%M:%S GMT"),
+ }
+ )
@httprettified
@@ -169,29 +187,28 @@ def test_rotating_responses_with_httplib2(now):
"HTTPretty should support rotating responses with httplib2"
HTTPretty.register_uri(
- HTTPretty.GET, "https://api.yahoo.com/test",
+ HTTPretty.GET,
+ "https://api.yahoo.com/test",
responses=[
HTTPretty.Response(body="first response", status=201),
- HTTPretty.Response(body='second and last response', status=202),
- ])
+ HTTPretty.Response(body="second and last response", status=202),
+ ],
+ )
- headers1, body1 = httplib2.Http().request(
- 'https://api.yahoo.com/test', 'GET')
+ headers1, body1 = httplib2.Http().request("https://api.yahoo.com/test", "GET")
- expect(headers1['status']).to.equal('201')
- expect(body1).to.equal(b'first response')
+ expect(headers1["status"]).to.equal("201")
+ expect(body1).to.equal(b"first response")
- headers2, body2 = httplib2.Http().request(
- 'https://api.yahoo.com/test', 'GET')
+ headers2, body2 = httplib2.Http().request("https://api.yahoo.com/test", "GET")
- expect(headers2['status']).to.equal('202')
- expect(body2).to.equal(b'second and last response')
+ expect(headers2["status"]).to.equal("202")
+ expect(body2).to.equal(b"second and last response")
- headers3, body3 = httplib2.Http().request(
- 'https://api.yahoo.com/test', 'GET')
+ headers3, body3 = httplib2.Http().request("https://api.yahoo.com/test", "GET")
- expect(headers3['status']).to.equal('202')
- expect(body3).to.equal(b'second and last response')
+ expect(headers3["status"]).to.equal("202")
+ expect(body3).to.equal(b"second and last response")
@httprettified
@@ -199,24 +216,22 @@ def test_rotating_responses_with_httplib2(now):
def test_can_inspect_last_request(now):
"HTTPretty.last_request is a mimetools.Message request from last match"
- HTTPretty.register_uri(HTTPretty.POST, "http://api.github.com/",
- body='{"repositories": ["HTTPretty", "lettuce"]}')
+ HTTPretty.register_uri(
+ HTTPretty.POST,
+ "http://api.github.com/",
+ body='{"repositories": ["HTTPretty", "lettuce"]}',
+ )
headers, body = httplib2.Http().request(
- 'http://api.github.com', 'POST',
+ "http://api.github.com",
+ "POST",
body='{"username": "gabrielfalcao"}',
- headers={
- 'content-type': 'text/json',
- },
+ headers={"content-type": "text/json"},
)
- expect(HTTPretty.last_request.method).to.equal('POST')
- expect(HTTPretty.last_request.body).to.equal(
- b'{"username": "gabrielfalcao"}',
- )
- expect(HTTPretty.last_request.headers['content-type']).to.equal(
- 'text/json',
- )
+ expect(HTTPretty.last_request.method).to.equal("POST")
+ expect(HTTPretty.last_request.body).to.equal(b'{"username": "gabrielfalcao"}')
+ expect(HTTPretty.last_request.headers["content-type"]).to.equal("text/json")
expect(body).to.equal(b'{"repositories": ["HTTPretty", "lettuce"]}')
@@ -225,24 +240,22 @@ def test_can_inspect_last_request(now):
def test_can_inspect_last_request_with_ssl(now):
"HTTPretty.last_request is recorded even when mocking 'https' (SSL)"
- HTTPretty.register_uri(HTTPretty.POST, "https://secure.github.com/",
- body='{"repositories": ["HTTPretty", "lettuce"]}')
+ HTTPretty.register_uri(
+ HTTPretty.POST,
+ "https://secure.github.com/",
+ body='{"repositories": ["HTTPretty", "lettuce"]}',
+ )
headers, body = httplib2.Http().request(
- 'https://secure.github.com', 'POST',
+ "https://secure.github.com",
+ "POST",
body='{"username": "gabrielfalcao"}',
- headers={
- 'content-type': 'text/json',
- },
+ headers={"content-type": "text/json"},
)
- expect(HTTPretty.last_request.method).to.equal('POST')
- expect(HTTPretty.last_request.body).to.equal(
- b'{"username": "gabrielfalcao"}',
- )
- expect(HTTPretty.last_request.headers['content-type']).to.equal(
- 'text/json',
- )
+ expect(HTTPretty.last_request.method).to.equal("POST")
+ expect(HTTPretty.last_request.body).to.equal(b'{"username": "gabrielfalcao"}')
+ expect(HTTPretty.last_request.headers["content-type"]).to.equal("text/json")
expect(body).to.equal(b'{"repositories": ["HTTPretty", "lettuce"]}')
@@ -251,40 +264,42 @@ def test_can_inspect_last_request_with_ssl(now):
def test_httpretty_ignores_querystrings_from_registered_uri(now):
"Registering URIs with query string cause them to be ignored"
- HTTPretty.register_uri(HTTPretty.GET, "http://yipit.com/?id=123",
- body="Find the best daily deals")
+ HTTPretty.register_uri(
+ HTTPretty.GET, "http://yipit.com/?id=123", body="Find the best daily deals"
+ )
- _, got = httplib2.Http().request('http://yipit.com/?id=123', 'GET')
+ _, got = httplib2.Http().request("http://yipit.com/?id=123", "GET")
- expect(got).to.equal(b'Find the best daily deals')
- expect(HTTPretty.last_request.method).to.equal('GET')
- expect(HTTPretty.last_request.path).to.equal('/?id=123')
+ expect(got).to.equal(b"Find the best daily deals")
+ expect(HTTPretty.last_request.method).to.equal("GET")
+ expect(HTTPretty.last_request.path).to.equal("/?id=123")
@httprettified
@within(two=microseconds)
def test_callback_response(now):
- ("HTTPretty should all a callback function to be set as the body with"
- " httplib2")
+ ("HTTPretty should all a callback function to be set as the body with" " httplib2")
def request_callback(request, uri, headers):
- return [200, headers, "The {} response from {}".format(decode_utf8(request.method), uri)]
+ return [
+ 200,
+ headers,
+ "The {} response from {}".format(decode_utf8(request.method), uri),
+ ]
HTTPretty.register_uri(
- HTTPretty.GET, "https://api.yahoo.com/test",
- body=request_callback)
+ HTTPretty.GET, "https://api.yahoo.com/test", body=request_callback
+ )
- headers1, body1 = httplib2.Http().request(
- 'https://api.yahoo.com/test', 'GET')
+ headers1, body1 = httplib2.Http().request("https://api.yahoo.com/test", "GET")
expect(body1).to.equal(b"The GET response from https://api.yahoo.com/test")
HTTPretty.register_uri(
- HTTPretty.POST, "https://api.yahoo.com/test_post",
- body=request_callback)
+ HTTPretty.POST, "https://api.yahoo.com/test_post", body=request_callback
+ )
- headers2, body2 = httplib2.Http().request(
- 'https://api.yahoo.com/test_post', 'POST')
+ headers2, body2 = httplib2.Http().request("https://api.yahoo.com/test_post", "POST")
expect(body2).to.equal(b"The POST response from https://api.yahoo.com/test_post")
@@ -299,7 +314,9 @@ def test_httpretty_should_allow_registering_regexes():
body="Found brand",
)
- response, body = httplib2.Http().request('https://api.yipit.com/v1/deal;brand=gap', 'GET')
- expect(body).to.equal(b'Found brand')
- expect(HTTPretty.last_request.method).to.equal('GET')
- expect(HTTPretty.last_request.path).to.equal('/v1/deal;brand=gap')
+ response, body = httplib2.Http().request(
+ "https://api.yipit.com/v1/deal;brand=gap", "GET"
+ )
+ expect(body).to.equal(b"Found brand")
+ expect(HTTPretty.last_request.method).to.equal("GET")
+ expect(HTTPretty.last_request.path).to.equal("/v1/deal;brand=gap")
diff --git a/tests/functional/test_passthrough.py b/tests/functional/test_passthrough.py
index f39cf2a..a37c09b 100644
--- a/tests/functional/test_passthrough.py
+++ b/tests/functional/test_passthrough.py
@@ -33,16 +33,16 @@ from httpretty import HTTPretty
@skip
def test_http_passthrough():
- url = 'http://httpbin.org/status/200'
+ url = "http://httpbin.org/status/200"
response1 = requests.get(url)
response1 = requests.get(url, stream=True)
HTTPretty.enable()
- HTTPretty.register_uri(HTTPretty.GET, 'http://google.com/', body="Not Google")
+ HTTPretty.register_uri(HTTPretty.GET, "http://google.com/", body="Not Google")
- response2 = requests.get('http://google.com/')
- expect(response2.content).to.equal(b'Not Google')
+ response2 = requests.get("http://google.com/")
+ expect(response2.content).to.equal(b"Not Google")
response3 = requests.get(url, stream=True)
(response3.content).should.equal(response1.content)
@@ -55,15 +55,15 @@ def test_http_passthrough():
@skip
def test_https_passthrough():
- url = 'https://raw.githubusercontent.com/gabrielfalcao/HTTPretty/master/COPYING'
+ url = "https://raw.githubusercontent.com/gabrielfalcao/HTTPretty/master/COPYING"
response1 = requests.get(url, stream=True)
HTTPretty.enable()
- HTTPretty.register_uri(HTTPretty.GET, 'https://google.com/', body="Not Google")
+ HTTPretty.register_uri(HTTPretty.GET, "https://google.com/", body="Not Google")
- response2 = requests.get('https://google.com/')
- expect(response2.content).to.equal(b'Not Google')
+ response2 = requests.get("https://google.com/")
+ expect(response2.content).to.equal(b"Not Google")
response3 = requests.get(url, stream=True)
(response3.content).should.equal(response1.content)
diff --git a/tests/functional/test_requests.py b/tests/functional/test_requests.py
index 06bdc1e..e1a0a30 100644
--- a/tests/functional/test_requests.py
+++ b/tests/functional/test_requests.py
@@ -54,13 +54,14 @@ else:
try:
advance_iterator = next
except NameError:
+
def advance_iterator(it):
return it.next()
next = advance_iterator
-server_url = lambda path, port: "http://localhost:{}/{}".format(port, path.lstrip('/'))
+server_url = lambda path, port: "http://localhost:{}/{}".format(port, path.lstrip("/"))
@httprettified
@@ -68,13 +69,14 @@ server_url = lambda path, port: "http://localhost:{}/{}".format(port, path.lstri
def test_httpretty_should_mock_a_simple_get_with_requests_read(now):
"HTTPretty should mock a simple GET with requests.get"
- HTTPretty.register_uri(HTTPretty.GET, "http://yipit.com/",
- body="Find the best daily deals")
+ HTTPretty.register_uri(
+ HTTPretty.GET, "http://yipit.com/", body="Find the best daily deals"
+ )
- response = requests.get('http://yipit.com')
- expect(response.text).to.equal('Find the best daily deals')
- expect(HTTPretty.last_request.method).to.equal('GET')
- expect(HTTPretty.last_request.path).to.equal('/')
+ response = requests.get("http://yipit.com")
+ expect(response.text).to.equal("Find the best daily deals")
+ expect(HTTPretty.last_request.method).to.equal("GET")
+ expect(HTTPretty.last_request.path).to.equal("/")
@httprettified
@@ -82,14 +84,14 @@ def test_httpretty_should_mock_a_simple_get_with_requests_read(now):
def test_httpretty_provides_easy_access_to_querystrings(now):
"HTTPretty should provide an easy access to the querystring"
- HTTPretty.register_uri(HTTPretty.GET, "http://yipit.com/",
- body="Find the best daily deals")
+ HTTPretty.register_uri(
+ HTTPretty.GET, "http://yipit.com/", body="Find the best daily deals"
+ )
- requests.get('http://yipit.com/?foo=bar&foo=baz&chuck=norris')
- expect(HTTPretty.last_request.querystring).to.equal({
- 'foo': ['bar', 'baz'],
- 'chuck': ['norris'],
- })
+ requests.get("http://yipit.com/?foo=bar&foo=baz&chuck=norris")
+ expect(HTTPretty.last_request.querystring).to.equal(
+ {"foo": ["bar", "baz"], "chuck": ["norris"]}
+ )
@httprettified
@@ -97,21 +99,26 @@ def test_httpretty_provides_easy_access_to_querystrings(now):
def test_httpretty_should_mock_headers_requests(now):
"HTTPretty should mock basic headers with requests"
- HTTPretty.register_uri(HTTPretty.GET, "http://github.com/",
- body="this is supposed to be the response",
- status=201)
+ HTTPretty.register_uri(
+ HTTPretty.GET,
+ "http://github.com/",
+ body="this is supposed to be the response",
+ status=201,
+ )
- response = requests.get('http://github.com')
+ response = requests.get("http://github.com")
expect(response.status_code).to.equal(201)
- expect(dict(response.headers)).to.equal({
- 'content-type': 'text/plain; charset=utf-8',
- 'connection': 'close',
- 'content-length': '35',
- 'status': '201',
- 'server': 'Python/HTTPretty',
- 'date': now.strftime('%a, %d %b %Y %H:%M:%S GMT'),
- })
+ expect(dict(response.headers)).to.equal(
+ {
+ "content-type": "text/plain; charset=utf-8",
+ "connection": "close",
+ "content-length": "35",
+ "status": "201",
+ "server": "Python/HTTPretty",
+ "date": now.strftime("%a, %d %b %Y %H:%M:%S GMT"),
+ }
+ )
@httprettified
@@ -119,24 +126,29 @@ def test_httpretty_should_mock_headers_requests(now):
def test_httpretty_should_allow_adding_and_overwritting_requests(now):
"HTTPretty should allow adding and overwritting headers with requests"
- HTTPretty.register_uri(HTTPretty.GET, "http://github.com/foo",
- body="this is supposed to be the response",
- adding_headers={
- 'Server': 'Apache',
- 'Content-Length': '27',
- 'Content-Type': 'application/json',
- })
-
- response = requests.get('http://github.com/foo')
+ HTTPretty.register_uri(
+ HTTPretty.GET,
+ "http://github.com/foo",
+ body="this is supposed to be the response",
+ adding_headers={
+ "Server": "Apache",
+ "Content-Length": "27",
+ "Content-Type": "application/json",
+ },
+ )
- expect(dict(response.headers)).to.equal({
- 'content-type': 'application/json',
- 'connection': 'close',
- 'content-length': '27',
- 'status': '200',
- 'server': 'Apache',
- 'date': now.strftime('%a, %d %b %Y %H:%M:%S GMT'),
- })
+ response = requests.get("http://github.com/foo")
+
+ expect(dict(response.headers)).to.equal(
+ {
+ "content-type": "application/json",
+ "connection": "close",
+ "content-length": "27",
+ "status": "200",
+ "server": "Apache",
+ "date": now.strftime("%a, %d %b %Y %H:%M:%S GMT"),
+ }
+ )
@httprettified
@@ -144,43 +156,46 @@ def test_httpretty_should_allow_adding_and_overwritting_requests(now):
def test_httpretty_should_allow_forcing_headers_requests(now):
"HTTPretty should allow forcing headers with requests"
- HTTPretty.register_uri(HTTPretty.GET, "http://github.com/foo",
- body="<root><baz /</root>",
- forcing_headers={
- 'Content-Type': 'application/xml',
- 'Content-Length': '19',
- })
+ HTTPretty.register_uri(
+ HTTPretty.GET,
+ "http://github.com/foo",
+ body="<root><baz /</root>",
+ forcing_headers={"Content-Type": "application/xml", "Content-Length": "19"},
+ )
- response = requests.get('http://github.com/foo')
+ response = requests.get("http://github.com/foo")
- expect(dict(response.headers)).to.equal({
- 'content-type': 'application/xml',
- 'content-length': '19',
- })
+ expect(dict(response.headers)).to.equal(
+ {"content-type": "application/xml", "content-length": "19"}
+ )
@httprettified
@within(two=microseconds)
def test_httpretty_should_allow_adding_and_overwritting_by_kwargs_u2(now):
- "HTTPretty should allow adding and overwritting headers by keyword args " \
- "with requests"
+ "HTTPretty should allow adding and overwritting headers by keyword args " "with requests"
- HTTPretty.register_uri(HTTPretty.GET, "http://github.com/foo",
- body="this is supposed to be the response",
- server='Apache',
- content_length='27',
- content_type='application/json')
-
- response = requests.get('http://github.com/foo')
+ HTTPretty.register_uri(
+ HTTPretty.GET,
+ "http://github.com/foo",
+ body="this is supposed to be the response",
+ server="Apache",
+ content_length="27",
+ content_type="application/json",
+ )
- expect(dict(response.headers)).to.equal({
- 'content-type': 'application/json',
- 'connection': 'close',
- 'content-length': '27',
- 'status': '200',
- 'server': 'Apache',
- 'date': now.strftime('%a, %d %b %Y %H:%M:%S GMT'),
- })
+ response = requests.get("http://github.com/foo")
+
+ expect(dict(response.headers)).to.equal(
+ {
+ "content-type": "application/json",
+ "connection": "close",
+ "content-length": "27",
+ "status": "200",
+ "server": "Apache",
+ "date": now.strftime("%a, %d %b %Y %H:%M:%S GMT"),
+ }
+ )
@httprettified
@@ -189,29 +204,28 @@ def test_rotating_responses_with_requests(now):
"HTTPretty should support rotating responses with requests"
HTTPretty.register_uri(
- HTTPretty.GET, "https://api.yahoo.com/test",
+ HTTPretty.GET,
+ "https://api.yahoo.com/test",
responses=[
HTTPretty.Response(body=b"first response", status=201),
- HTTPretty.Response(body=b'second and last response', status=202),
- ])
+ HTTPretty.Response(body=b"second and last response", status=202),
+ ],
+ )
- response1 = requests.get(
- 'https://api.yahoo.com/test')
+ response1 = requests.get("https://api.yahoo.com/test")
expect(response1.status_code).to.equal(201)
- expect(response1.text).to.equal('first response')
+ expect(response1.text).to.equal("first response")
- response2 = requests.get(
- 'https://api.yahoo.com/test')
+ response2 = requests.get("https://api.yahoo.com/test")
expect(response2.status_code).to.equal(202)
- expect(response2.text).to.equal('second and last response')
+ expect(response2.text).to.equal("second and last response")
- response3 = requests.get(
- 'https://api.yahoo.com/test')
+ response3 = requests.get("https://api.yahoo.com/test")
expect(response3.status_code).to.equal(202)
- expect(response3.text).to.equal('second and last response')
+ expect(response3.text).to.equal("second and last response")
@httprettified
@@ -219,24 +233,21 @@ def test_rotating_responses_with_requests(now):
def test_can_inspect_last_request(now):
"HTTPretty.last_request is a mimetools.Message request from last match"
- HTTPretty.register_uri(HTTPretty.POST, "http://api.github.com/",
- body='{"repositories": ["HTTPretty", "lettuce"]}')
+ HTTPretty.register_uri(
+ HTTPretty.POST,
+ "http://api.github.com/",
+ body='{"repositories": ["HTTPretty", "lettuce"]}',
+ )
response = requests.post(
- 'http://api.github.com',
+ "http://api.github.com",
'{"username": "gabrielfalcao"}',
- headers={
- 'content-type': 'text/json',
- },
+ headers={"content-type": "text/json"},
)
- expect(HTTPretty.last_request.method).to.equal('POST')
- expect(HTTPretty.last_request.body).to.equal(
- b'{"username": "gabrielfalcao"}',
- )
- expect(HTTPretty.last_request.headers['content-type']).to.equal(
- 'text/json',
- )
+ expect(HTTPretty.last_request.method).to.equal("POST")
+ expect(HTTPretty.last_request.body).to.equal(b'{"username": "gabrielfalcao"}')
+ expect(HTTPretty.last_request.headers["content-type"]).to.equal("text/json")
expect(response.json()).to.equal({"repositories": ["HTTPretty", "lettuce"]})
@@ -245,24 +256,21 @@ def test_can_inspect_last_request(now):
def test_can_inspect_last_request_with_ssl(now):
"HTTPretty.last_request is recorded even when mocking 'https' (SSL)"
- HTTPretty.register_uri(HTTPretty.POST, "https://secure.github.com/",
- body='{"repositories": ["HTTPretty", "lettuce"]}')
+ HTTPretty.register_uri(
+ HTTPretty.POST,
+ "https://secure.github.com/",
+ body='{"repositories": ["HTTPretty", "lettuce"]}',
+ )
response = requests.post(
- 'https://secure.github.com',
+ "https://secure.github.com",
'{"username": "gabrielfalcao"}',
- headers={
- 'content-type': 'text/json',
- },
+ headers={"content-type": "text/json"},
)
- expect(HTTPretty.last_request.method).to.equal('POST')
- expect(HTTPretty.last_request.body).to.equal(
- b'{"username": "gabrielfalcao"}',
- )
- expect(HTTPretty.last_request.headers['content-type']).to.equal(
- 'text/json',
- )
+ expect(HTTPretty.last_request.method).to.equal("POST")
+ expect(HTTPretty.last_request.body).to.equal(b'{"username": "gabrielfalcao"}')
+ expect(HTTPretty.last_request.headers["content-type"]).to.equal("text/json")
expect(response.json()).to.equal({"repositories": ["HTTPretty", "lettuce"]})
@@ -271,13 +279,14 @@ def test_can_inspect_last_request_with_ssl(now):
def test_httpretty_ignores_querystrings_from_registered_uri(now):
"HTTPretty should ignore querystrings from the registered uri (requests library)"
- HTTPretty.register_uri(HTTPretty.GET, "http://yipit.com/?id=123",
- body=b"Find the best daily deals")
+ HTTPretty.register_uri(
+ HTTPretty.GET, "http://yipit.com/?id=123", body=b"Find the best daily deals"
+ )
- response = requests.get('http://yipit.com/', params={'id': 123})
- expect(response.text).to.equal('Find the best daily deals')
- expect(HTTPretty.last_request.method).to.equal('GET')
- expect(HTTPretty.last_request.path).to.equal('/?id=123')
+ response = requests.get("http://yipit.com/", params={"id": 123})
+ expect(response.text).to.equal("Find the best daily deals")
+ expect(HTTPretty.last_request.method).to.equal("GET")
+ expect(HTTPretty.last_request.path).to.equal("/?id=123")
@httprettified
@@ -300,6 +309,7 @@ def test_streaming_responses(now):
def handler(signum, frame):
raise AssertionError(message)
+
signal.signal(signal.SIGALRM, handler)
signal.setitimer(signal.ITIMER_REAL, time)
yield
@@ -308,253 +318,270 @@ def test_streaming_responses(now):
# XXX this obviously isn't a fully functional twitter streaming client!
twitter_response_lines = [
b'{"text":"If \\"for the boobs\\" requests to follow me one more time I\'m calling the police. http://t.co/a0mDEAD8"}\r\n',
- b'\r\n',
- b'{"text":"RT @onedirection: Thanks for all your # FollowMe1D requests Directioners! We\u2019ll be following 10 people throughout the day starting NOW. G ..."}\r\n'
+ b"\r\n",
+ b'{"text":"RT @onedirection: Thanks for all your # FollowMe1D requests Directioners! We\u2019ll be following 10 people throughout the day starting NOW. G ..."}\r\n',
]
TWITTER_STREAMING_URL = "https://stream.twitter.com/1/statuses/filter.json"
- HTTPretty.register_uri(HTTPretty.POST, TWITTER_STREAMING_URL,
- body=(l for l in twitter_response_lines),
- streaming=True)
+ HTTPretty.register_uri(
+ HTTPretty.POST,
+ TWITTER_STREAMING_URL,
+ body=(l for l in twitter_response_lines),
+ streaming=True,
+ )
# taken from the requests docs
# Http://docs.python-requests.org/en/latest/user/advanced/# streaming-requests
- response = requests.post(TWITTER_STREAMING_URL, data={'track': 'requests'},
- auth=('username', 'password'), stream=True)
+ response = requests.post(
+ TWITTER_STREAMING_URL,
+ data={"track": "requests"},
+ auth=("username", "password"),
+ stream=True,
+ )
# test iterating by line
line_iter = response.iter_lines()
- with in_time(0.01, 'Iterating by line is taking forever!'):
+ with in_time(0.01, "Iterating by line is taking forever!"):
for i in xrange(len(twitter_response_lines)):
- expect(next(line_iter).strip()).to.equal(
- twitter_response_lines[i].strip())
+ expect(next(line_iter).strip()).to.equal(twitter_response_lines[i].strip())
# test iterating by line after a second request
response = requests.post(
TWITTER_STREAMING_URL,
- data={
- 'track': 'requests'
- },
- auth=('username', 'password'),
+ data={"track": "requests"},
+ auth=("username", "password"),
stream=True,
)
line_iter = response.iter_lines()
- with in_time(0.01, 'Iterating by line is taking forever the second time '
- 'around!'):
+ with in_time(
+ 0.01, "Iterating by line is taking forever the second time " "around!"
+ ):
for i in xrange(len(twitter_response_lines)):
- expect(next(line_iter).strip()).to.equal(
- twitter_response_lines[i].strip())
+ expect(next(line_iter).strip()).to.equal(twitter_response_lines[i].strip())
# test iterating by char
response = requests.post(
TWITTER_STREAMING_URL,
- data={
- 'track': 'requests'
- },
- auth=('username', 'password'),
- stream=True
+ data={"track": "requests"},
+ auth=("username", "password"),
+ stream=True,
)
- twitter_expected_response_body = b''.join(twitter_response_lines)
- with in_time(0.02, 'Iterating by char is taking forever!'):
- twitter_body = b''.join(c for c in response.iter_content(chunk_size=1))
+ twitter_expected_response_body = b"".join(twitter_response_lines)
+ with in_time(0.02, "Iterating by char is taking forever!"):
+ twitter_body = b"".join(c for c in response.iter_content(chunk_size=1))
expect(twitter_body).to.equal(twitter_expected_response_body)
# test iterating by chunks larger than the stream
- response = requests.post(TWITTER_STREAMING_URL, data={'track': 'requests'},
- auth=('username', 'password'), stream=True)
+ response = requests.post(
+ TWITTER_STREAMING_URL,
+ data={"track": "requests"},
+ auth=("username", "password"),
+ stream=True,
+ )
- with in_time(0.02, 'Iterating by large chunks is taking forever!'):
- twitter_body = b''.join(c for c in
- response.iter_content(chunk_size=1024))
+ with in_time(0.02, "Iterating by large chunks is taking forever!"):
+ twitter_body = b"".join(c for c in response.iter_content(chunk_size=1024))
expect(twitter_body).to.equal(twitter_expected_response_body)
@httprettified
def test_multiline():
- url = 'http://httpbin.org/post'
- data = b'content=Im\r\na multiline\r\n\r\nsentence\r\n'
+ url = "http://httpbin.org/post"
+ data = b"content=Im\r\na multiline\r\n\r\nsentence\r\n"
headers = {
- 'Content-Type': 'application/x-www-form-urlencoded; charset=utf-8',
- 'Accept': 'text/plain',
+ "Content-Type": "application/x-www-form-urlencoded; charset=utf-8",
+ "Accept": "text/plain",
}
- HTTPretty.register_uri(
- HTTPretty.POST,
- url,
- )
+ HTTPretty.register_uri(HTTPretty.POST, url)
response = requests.post(url, data=data, headers=headers)
expect(response.status_code).to.equal(200)
- expect(HTTPretty.last_request.method).to.equal('POST')
- expect(HTTPretty.last_request.path).to.equal('/post')
+ expect(HTTPretty.last_request.method).to.equal("POST")
+ expect(HTTPretty.last_request.path).to.equal("/post")
expect(HTTPretty.last_request.body).to.equal(data)
- expect(HTTPretty.last_request.headers['content-length']).to.equal('37')
- expect(HTTPretty.last_request.headers['content-type']).to.equal('application/x-www-form-urlencoded; charset=utf-8')
+ expect(HTTPretty.last_request.headers["content-length"]).to.equal("37")
+ expect(HTTPretty.last_request.headers["content-type"]).to.equal(
+ "application/x-www-form-urlencoded; charset=utf-8"
+ )
expect(len(HTTPretty.latest_requests)).to.equal(1)
@httprettified
def test_octet_stream():
- url = 'http://httpbin.org/post'
+ url = "http://httpbin.org/post"
data = b"\xf5\x00\x00\x00" # utf-8 with invalid start byte
- headers = {
- 'Content-Type': 'application/octet-stream',
- }
- HTTPretty.register_uri(
- HTTPretty.POST,
- url,
- )
+ headers = {"Content-Type": "application/octet-stream"}
+ HTTPretty.register_uri(HTTPretty.POST, url)
response = requests.post(url, data=data, headers=headers)
expect(response.status_code).to.equal(200)
- expect(HTTPretty.last_request.method).to.equal('POST')
- expect(HTTPretty.last_request.path).to.equal('/post')
+ expect(HTTPretty.last_request.method).to.equal("POST")
+ expect(HTTPretty.last_request.path).to.equal("/post")
expect(HTTPretty.last_request.body).to.equal(data)
- expect(HTTPretty.last_request.headers['content-length']).to.equal('4')
- expect(HTTPretty.last_request.headers['content-type']).to.equal('application/octet-stream')
+ expect(HTTPretty.last_request.headers["content-length"]).to.equal("4")
+ expect(HTTPretty.last_request.headers["content-type"]).to.equal(
+ "application/octet-stream"
+ )
expect(len(HTTPretty.latest_requests)).to.equal(1)
@httprettified
def test_multipart():
- url = 'http://httpbin.org/post'
+ url = "http://httpbin.org/post"
data = b'--xXXxXXyYYzzz\r\nContent-Disposition: form-data; name="content"\r\nContent-Type: text/plain; charset=utf-8\r\nContent-Length: 68\r\n\r\nAction: comment\nText: Comment with attach\nAttachment: x1.txt, x2.txt\r\n--xXXxXXyYYzzz\r\nContent-Disposition: form-data; name="attachment_2"; filename="x.txt"\r\nContent-Type: text/plain\r\nContent-Length: 4\r\n\r\nbye\n\r\n--xXXxXXyYYzzz\r\nContent-Disposition: form-data; name="attachment_1"; filename="x.txt"\r\nContent-Type: text/plain\r\nContent-Length: 4\r\n\r\nbye\n\r\n--xXXxXXyYYzzz--\r\n'
- headers = {'Content-Length': '495', 'Content-Type': 'multipart/form-data; boundary=xXXxXXyYYzzz', 'Accept': 'text/plain'}
- HTTPretty.register_uri(
- HTTPretty.POST,
- url,
- )
+ headers = {
+ "Content-Length": "495",
+ "Content-Type": "multipart/form-data; boundary=xXXxXXyYYzzz",
+ "Accept": "text/plain",
+ }
+ HTTPretty.register_uri(HTTPretty.POST, url)
response = requests.post(url, data=data, headers=headers)
expect(response.status_code).to.equal(200)
- expect(HTTPretty.last_request.method).to.equal('POST')
- expect(HTTPretty.last_request.path).to.equal('/post')
+ expect(HTTPretty.last_request.method).to.equal("POST")
+ expect(HTTPretty.last_request.path).to.equal("/post")
expect(HTTPretty.last_request.body).to.equal(data)
- expect(HTTPretty.last_request.headers['content-length']).to.equal('495')
- expect(HTTPretty.last_request.headers['content-type']).to.equal('multipart/form-data; boundary=xXXxXXyYYzzz')
+ expect(HTTPretty.last_request.headers["content-length"]).to.equal("495")
+ expect(HTTPretty.last_request.headers["content-type"]).to.equal(
+ "multipart/form-data; boundary=xXXxXXyYYzzz"
+ )
expect(len(HTTPretty.latest_requests)).to.equal(1)
@httprettified
@within(two=microseconds)
def test_callback_response(now):
- ("HTTPretty should call a callback function and set its return value as the body of the response"
- " requests")
+ (
+ "HTTPretty should call a callback function and set its return value as the body of the response"
+ " requests"
+ )
def request_callback(request, uri, headers):
- return [200, headers, "The {} response from {}".format(decode_utf8(request.method), uri)]
+ return [
+ 200,
+ headers,
+ "The {} response from {}".format(decode_utf8(request.method), uri),
+ ]
HTTPretty.register_uri(
- HTTPretty.GET, "https://api.yahoo.com/test",
- body=request_callback)
+ HTTPretty.GET, "https://api.yahoo.com/test", body=request_callback
+ )
- response = requests.get('https://api.yahoo.com/test')
+ response = requests.get("https://api.yahoo.com/test")
expect(response.text).to.equal("The GET response from https://api.yahoo.com/test")
HTTPretty.register_uri(
- HTTPretty.POST, "https://api.yahoo.com/test_post",
- body=request_callback)
+ HTTPretty.POST, "https://api.yahoo.com/test_post", body=request_callback
+ )
response = requests.post(
- "https://api.yahoo.com/test_post",
- {"username": "gabrielfalcao"}
+ "https://api.yahoo.com/test_post", {"username": "gabrielfalcao"}
)
- expect(response.text).to.equal("The POST response from https://api.yahoo.com/test_post")
+ expect(response.text).to.equal(
+ "The POST response from https://api.yahoo.com/test_post"
+ )
@httprettified
@within(two=microseconds)
def test_callback_body_remains_callable_for_any_subsequent_requests(now):
- ("HTTPretty should call a callback function more than one"
- " requests")
+ ("HTTPretty should call a callback function more than one" " requests")
def request_callback(request, uri, headers):
- return [200, headers, "The {} response from {}".format(decode_utf8(request.method), uri)]
+ return [
+ 200,
+ headers,
+ "The {} response from {}".format(decode_utf8(request.method), uri),
+ ]
HTTPretty.register_uri(
- HTTPretty.GET, "https://api.yahoo.com/test",
- body=request_callback)
+ HTTPretty.GET, "https://api.yahoo.com/test", body=request_callback
+ )
- response = requests.get('https://api.yahoo.com/test')
+ response = requests.get("https://api.yahoo.com/test")
expect(response.text).to.equal("The GET response from https://api.yahoo.com/test")
- response = requests.get('https://api.yahoo.com/test')
+ response = requests.get("https://api.yahoo.com/test")
expect(response.text).to.equal("The GET response from https://api.yahoo.com/test")
@httprettified
@within(two=microseconds)
def test_callback_setting_headers_and_status_response(now):
- ("HTTPretty should call a callback function and uses it retur tuple as status code, headers and body"
- " requests")
+ (
+ "HTTPretty should call a callback function and uses it retur tuple as status code, headers and body"
+ " requests"
+ )
def request_callback(request, uri, headers):
- headers.update({'a': 'b'})
- return [418, headers, "The {} response from {}".format(decode_utf8(request.method), uri)]
+ headers.update({"a": "b"})
+ return [
+ 418,
+ headers,
+ "The {} response from {}".format(decode_utf8(request.method), uri),
+ ]
HTTPretty.register_uri(
- HTTPretty.GET, "https://api.yahoo.com/test",
- body=request_callback)
+ HTTPretty.GET, "https://api.yahoo.com/test", body=request_callback
+ )
- response = requests.get('https://api.yahoo.com/test')
+ response = requests.get("https://api.yahoo.com/test")
expect(response.text).to.equal("The GET response from https://api.yahoo.com/test")
- expect(response.headers).to.have.key('a').being.equal("b")
+ expect(response.headers).to.have.key("a").being.equal("b")
expect(response.status_code).to.equal(418)
HTTPretty.register_uri(
- HTTPretty.POST, "https://api.yahoo.com/test_post",
- body=request_callback)
+ HTTPretty.POST, "https://api.yahoo.com/test_post", body=request_callback
+ )
response = requests.post(
- "https://api.yahoo.com/test_post",
- {"username": "gabrielfalcao"}
+ "https://api.yahoo.com/test_post", {"username": "gabrielfalcao"}
)
- expect(response.text).to.equal("The POST response from https://api.yahoo.com/test_post")
- expect(response.headers).to.have.key('a').being.equal("b")
+ expect(response.text).to.equal(
+ "The POST response from https://api.yahoo.com/test_post"
+ )
+ expect(response.headers).to.have.key("a").being.equal("b")
expect(response.status_code).to.equal(418)
@httprettified
def test_httpretty_should_respect_matcher_priority():
HTTPretty.register_uri(
- HTTPretty.GET,
- re.compile(r".*"),
- body='high priority',
- priority=5,
+ HTTPretty.GET, re.compile(r".*"), body="high priority", priority=5
)
HTTPretty.register_uri(
- HTTPretty.GET,
- re.compile(r".+"),
- body='low priority',
- priority=0,
+ HTTPretty.GET, re.compile(r".+"), body="low priority", priority=0
)
- response = requests.get('http://api.yipit.com/v1/')
- expect(response.text).to.equal('high priority')
+ response = requests.get("http://api.yipit.com/v1/")
+ expect(response.text).to.equal("high priority")
@httprettified
@within(two=microseconds)
def test_callback_setting_content_length_on_head(now):
- ("HTTPretty should call a callback function, use it's return tuple as status code, headers and body"
- " requests and respect the content-length header when responding to HEAD")
+ (
+ "HTTPretty should call a callback function, use it's return tuple as status code, headers and body"
+ " requests and respect the content-length header when responding to HEAD"
+ )
def request_callback(request, uri, headers):
- headers.update({'content-length': 12345})
+ headers.update({"content-length": 12345})
return [200, headers, ""]
HTTPretty.register_uri(
- HTTPretty.HEAD, "https://api.yahoo.com/test",
- body=request_callback)
+ HTTPretty.HEAD, "https://api.yahoo.com/test", body=request_callback
+ )
- response = requests.head('https://api.yahoo.com/test')
- expect(response.headers).to.have.key('content-length').being.equal("12345")
+ response = requests.head("https://api.yahoo.com/test")
+ expect(response.headers).to.have.key("content-length").being.equal("12345")
expect(response.status_code).to.equal(200)
@@ -565,14 +592,20 @@ def test_httpretty_should_allow_registering_regexes_and_give_a_proper_match_to_t
HTTPretty.register_uri(
HTTPretty.GET,
re.compile(r"https://api.yipit.com/v1/deal;brand=(?P<brand_name>\w+)"),
- body=lambda method, uri, headers: [200, headers, uri]
+ body=lambda method, uri, headers: [200, headers, uri],
)
- response = requests.get('https://api.yipit.com/v1/deal;brand=gap?first_name=chuck&last_name=norris')
+ response = requests.get(
+ "https://api.yipit.com/v1/deal;brand=gap?first_name=chuck&last_name=norris"
+ )
- expect(response.text).to.equal('https://api.yipit.com/v1/deal;brand=gap?first_name=chuck&last_name=norris')
- expect(HTTPretty.last_request.method).to.equal('GET')
- expect(HTTPretty.last_request.path).to.equal('/v1/deal;brand=gap?first_name=chuck&last_name=norris')
+ expect(response.text).to.equal(
+ "https://api.yipit.com/v1/deal;brand=gap?first_name=chuck&last_name=norris"
+ )
+ expect(HTTPretty.last_request.method).to.equal("GET")
+ expect(HTTPretty.last_request.path).to.equal(
+ "/v1/deal;brand=gap?first_name=chuck&last_name=norris"
+ )
@httprettified
@@ -585,11 +618,14 @@ def test_httpretty_should_allow_registering_regexes():
body="Found brand",
)
- response = requests.get('https://api.yipit.com/v1/deal;brand=gap?first_name=chuck&last_name=norris'
- )
- expect(response.text).to.equal('Found brand')
- expect(HTTPretty.last_request.method).to.equal('GET')
- expect(HTTPretty.last_request.path).to.equal('/v1/deal;brand=gap?first_name=chuck&last_name=norris')
+ response = requests.get(
+ "https://api.yipit.com/v1/deal;brand=gap?first_name=chuck&last_name=norris"
+ )
+ expect(response.text).to.equal("Found brand")
+ expect(HTTPretty.last_request.method).to.equal("GET")
+ expect(HTTPretty.last_request.path).to.equal(
+ "/v1/deal;brand=gap?first_name=chuck&last_name=norris"
+ )
@httprettified
@@ -599,15 +635,16 @@ def test_httpretty_provides_easy_access_to_querystrings_with_regexes():
HTTPretty.register_uri(
HTTPretty.GET,
re.compile(r"https://api.yipit.com/v1/(?P<endpoint>\w+)/$"),
- body="Find the best daily deals"
+ body="Find the best daily deals",
)
- response = requests.get('https://api.yipit.com/v1/deals/?foo=bar&foo=baz&chuck=norris')
+ response = requests.get(
+ "https://api.yipit.com/v1/deals/?foo=bar&foo=baz&chuck=norris"
+ )
expect(response.text).to.equal("Find the best daily deals")
- expect(HTTPretty.last_request.querystring).to.equal({
- 'foo': ['bar', 'baz'],
- 'chuck': ['norris'],
- })
+ expect(HTTPretty.last_request.querystring).to.equal(
+ {"foo": ["bar", "baz"], "chuck": ["norris"]}
+ )
@httprettified
@@ -619,28 +656,24 @@ def test_httpretty_allows_to_chose_if_querystring_should_be_matched():
HTTPretty.GET,
re.compile(r"https://example.org/(?P<endpoint>\w+)/$"),
body="Nudge, nudge, wink, wink. Know what I mean?",
- match_querystring=True
+ match_querystring=True,
)
- response = requests.get('https://example.org/what/')
- expect(response.text).to.equal('Nudge, nudge, wink, wink. Know what I mean?')
+ response = requests.get("https://example.org/what/")
+ expect(response.text).to.equal("Nudge, nudge, wink, wink. Know what I mean?")
- response = requests.get('https://example.org/what/?flying=coconuts')
- expect(response.text).to.not_be.equal('Nudge, nudge, wink, wink. Know what I mean?')
+ response = requests.get("https://example.org/what/?flying=coconuts")
+ expect(response.text).to.not_be.equal("Nudge, nudge, wink, wink. Know what I mean?")
@httprettified
def test_httpretty_should_allow_multiple_methods_for_the_same_uri():
"HTTPretty should allow registering multiple methods for the same uri"
- url = 'http://test.com/test'
- methods = ['GET', 'POST', 'PUT', 'OPTIONS']
+ url = "http://test.com/test"
+ methods = ["GET", "POST", "PUT", "OPTIONS"]
for method in methods:
- HTTPretty.register_uri(
- getattr(HTTPretty, method),
- url,
- method
- )
+ HTTPretty.register_uri(getattr(HTTPretty, method), url, method)
for method in methods:
request_action = getattr(requests, method.lower())
@@ -651,10 +684,11 @@ def test_httpretty_should_allow_multiple_methods_for_the_same_uri():
def test_httpretty_should_allow_registering_regexes_with_streaming_responses():
"HTTPretty should allow registering regexes with streaming responses"
import os
- os.environ['DEBUG'] = 'true'
+
+ os.environ["DEBUG"] = "true"
def my_callback(request, url, headers):
- request.body.should.equal(b'hithere')
+ request.body.should.equal(b"hithere")
return 200, headers, "Received"
HTTPretty.register_uri(
@@ -664,52 +698,50 @@ def test_httpretty_should_allow_registering_regexes_with_streaming_responses():
)
def gen():
- yield b'hi'
- yield b'there'
+ yield b"hi"
+ yield b"there"
response = requests.post(
- 'https://api.yipit.com/v1/deal;brand=gap?first_name=chuck&last_name=norris',
+ "https://api.yipit.com/v1/deal;brand=gap?first_name=chuck&last_name=norris",
data=gen(),
)
expect(response.content).to.equal(b"Received")
- expect(HTTPretty.last_request.method).to.equal('POST')
- expect(HTTPretty.last_request.path).to.equal('/v1/deal;brand=gap?first_name=chuck&last_name=norris')
+ expect(HTTPretty.last_request.method).to.equal("POST")
+ expect(HTTPretty.last_request.path).to.equal(
+ "/v1/deal;brand=gap?first_name=chuck&last_name=norris"
+ )
@httprettified
def test_httpretty_should_allow_multiple_responses_with_multiple_methods():
"HTTPretty should allow multiple responses when binding multiple methods to the same uri"
- url = 'http://test.com/list'
+ url = "http://test.com/list"
# add get responses
HTTPretty.register_uri(
- HTTPretty.GET, url,
- responses=[
- HTTPretty.Response(body='a'),
- HTTPretty.Response(body='b'),
- ]
+ HTTPretty.GET,
+ url,
+ responses=[HTTPretty.Response(body="a"), HTTPretty.Response(body="b")],
)
# add post responses
HTTPretty.register_uri(
- HTTPretty.POST, url,
- responses=[
- HTTPretty.Response(body='c'),
- HTTPretty.Response(body='d'),
- ]
+ HTTPretty.POST,
+ url,
+ responses=[HTTPretty.Response(body="c"), HTTPretty.Response(body="d")],
)
- expect(requests.get(url).text).to.equal('a')
- expect(requests.post(url).text).to.equal('c')
+ expect(requests.get(url).text).to.equal("a")
+ expect(requests.post(url).text).to.equal("c")
- expect(requests.get(url).text).to.equal('b')
- expect(requests.get(url).text).to.equal('b')
- expect(requests.get(url).text).to.equal('b')
+ expect(requests.get(url).text).to.equal("b")
+ expect(requests.get(url).text).to.equal("b")
+ expect(requests.get(url).text).to.equal("b")
- expect(requests.post(url).text).to.equal('d')
- expect(requests.post(url).text).to.equal('d')
- expect(requests.post(url).text).to.equal('d')
+ expect(requests.post(url).text).to.equal("d")
+ expect(requests.post(url).text).to.equal("d")
+ expect(requests.post(url).text).to.equal("d")
@httprettified
@@ -717,19 +749,18 @@ def test_httpretty_should_normalize_url_patching():
"HTTPretty should normalize all url patching"
HTTPretty.register_uri(
- HTTPretty.GET,
- "http://yipit.com/foo(bar)",
- body="Find the best daily deals")
+ HTTPretty.GET, "http://yipit.com/foo(bar)", body="Find the best daily deals"
+ )
- response = requests.get('http://yipit.com/foo%28bar%29')
- expect(response.text).to.equal('Find the best daily deals')
+ response = requests.get("http://yipit.com/foo%28bar%29")
+ expect(response.text).to.equal("Find the best daily deals")
@httprettified
def test_lack_of_trailing_slash():
("HTTPretty should automatically append a slash to given urls")
- url = 'http://www.youtube.com'
- HTTPretty.register_uri(HTTPretty.GET, url, body='')
+ url = "http://www.youtube.com"
+ HTTPretty.register_uri(HTTPretty.GET, url, body="")
response = requests.get(url)
response.status_code.should.equal(200)
@@ -737,10 +768,13 @@ def test_lack_of_trailing_slash():
@httprettified
def test_unicode_querystrings():
("Querystrings should accept unicode characters")
- HTTPretty.register_uri(HTTPretty.GET, "http://yipit.com/login",
- body="Find the best daily deals")
- requests.get('http://yipit.com/login?user=Gabriel+Falcão')
- expect(HTTPretty.last_request.querystring['user'][0]).should.be.equal('Gabriel Falcão')
+ HTTPretty.register_uri(
+ HTTPretty.GET, "http://yipit.com/login", body="Find the best daily deals"
+ )
+ requests.get("http://yipit.com/login?user=Gabriel+Falcão")
+ expect(HTTPretty.last_request.querystring["user"][0]).should.be.equal(
+ "Gabriel Falcão"
+ )
@use_tornado_server
@@ -752,9 +786,11 @@ def test_recording_calls(port):
# When I record some calls
with HTTPretty.record(destination):
requests.get(server_url("/foobar?name=Gabriel&age=25", port))
- requests.post(server_url("/foobar", port),
- data=json.dumps({'test': '123'}),
- headers={"Test": "foobar"})
+ requests.post(
+ server_url("/foobar", port),
+ data=json.dumps({"test": "123"}),
+ headers={"Test": "foobar"},
+ )
# Then the destination path should exist
os.path.exists(destination).should.be.true
@@ -773,25 +809,24 @@ def test_recording_calls(port):
response.should.have.key("request").being.length_of(5)
response.should.have.key("response").being.length_of(3)
- response['request'].should.have.key("method").being.equal("GET")
- response['request'].should.have.key("headers").being.a(dict)
- response['request'].should.have.key("querystring").being.equal({
- "age": [
- "25"
- ],
- "name": [
- "Gabriel"
- ]
- })
- response['response'].should.have.key("status").being.equal(200)
- response['response'].should.have.key("body").being.an(text_type)
- response['response'].should.have.key("headers").being.a(dict)
+ response["request"].should.have.key("method").being.equal("GET")
+ response["request"].should.have.key("headers").being.a(dict)
+ response["request"].should.have.key("querystring").being.equal(
+ {"age": ["25"], "name": ["Gabriel"]}
+ )
+ response["response"].should.have.key("status").being.equal(200)
+ response["response"].should.have.key("body").being.an(text_type)
+ response["response"].should.have.key("headers").being.a(dict)
# older urllib3 had a bug where header keys were lower-cased:
# https://github.com/shazow/urllib3/issues/236
# cope with that
- if 'server' in response['response']["headers"]:
- response['response']["headers"]["Server"] = response['response']["headers"].pop("server")
- response['response']["headers"].should.have.key("Server").being.equal("TornadoServer/" + tornado_version)
+ if "server" in response["response"]["headers"]:
+ response["response"]["headers"]["Server"] = response["response"]["headers"].pop(
+ "server"
+ )
+ response["response"]["headers"].should.have.key("Server").being.equal(
+ "TornadoServer/" + tornado_version
+ )
# And When I playback the previously recorded calls
with HTTPretty.playback(destination):
@@ -799,7 +834,7 @@ def test_recording_calls(port):
response1 = requests.get(server_url("/foobar?name=Gabriel&age=25", port))
response2 = requests.post(
server_url("/foobar", port),
- data=json.dumps({'test': '123'}),
+ data=json.dumps({"test": "123"}),
headers={"Test": "foobar"},
)
@@ -813,26 +848,29 @@ def test_recording_calls(port):
@httprettified
def test_py26_callback_response():
- ("HTTPretty should call a callback function *once* and set its return value"
- " as the body of the response requests")
+ (
+ "HTTPretty should call a callback function *once* and set its return value"
+ " as the body of the response requests"
+ )
from mock import Mock
def _request_callback(request, uri, headers):
- return [200, headers, "The {} response from {}".format(decode_utf8(request.method), uri)]
+ return [
+ 200,
+ headers,
+ "The {} response from {}".format(decode_utf8(request.method), uri),
+ ]
request_callback = Mock()
request_callback.side_effect = _request_callback
HTTPretty.register_uri(
- HTTPretty.POST, "https://api.yahoo.com/test_post",
- body=request_callback)
-
- requests.post(
- "https://api.yahoo.com/test_post",
- {"username": "gabrielfalcao"}
+ HTTPretty.POST, "https://api.yahoo.com/test_post", body=request_callback
)
- os.environ['STOP'] = 'true'
+
+ requests.post("https://api.yahoo.com/test_post", {"username": "gabrielfalcao"})
+ os.environ["STOP"] = "true"
expect(request_callback.call_count).equal(1)
@@ -843,25 +881,31 @@ def test_httpretty_should_work_with_non_standard_ports():
HTTPretty.register_uri(
HTTPretty.GET,
re.compile(r"https://api.yipit.com:1234/v1/deal;brand=(?P<brand_name>\w+)"),
- body=lambda method, uri, headers: [200, headers, uri]
+ body=lambda method, uri, headers: [200, headers, uri],
)
HTTPretty.register_uri(
HTTPretty.POST,
"https://asdf.com:666/meow",
- body=lambda method, uri, headers: [200, headers, uri]
+ body=lambda method, uri, headers: [200, headers, uri],
)
- response = requests.get('https://api.yipit.com:1234/v1/deal;brand=gap?first_name=chuck&last_name=norris')
+ response = requests.get(
+ "https://api.yipit.com:1234/v1/deal;brand=gap?first_name=chuck&last_name=norris"
+ )
- expect(response.text).to.equal('https://api.yipit.com:1234/v1/deal;brand=gap?first_name=chuck&last_name=norris')
- expect(HTTPretty.last_request.method).to.equal('GET')
- expect(HTTPretty.last_request.path).to.equal('/v1/deal;brand=gap?first_name=chuck&last_name=norris')
+ expect(response.text).to.equal(
+ "https://api.yipit.com:1234/v1/deal;brand=gap?first_name=chuck&last_name=norris"
+ )
+ expect(HTTPretty.last_request.method).to.equal("GET")
+ expect(HTTPretty.last_request.path).to.equal(
+ "/v1/deal;brand=gap?first_name=chuck&last_name=norris"
+ )
- response = requests.post('https://asdf.com:666/meow')
+ response = requests.post("https://asdf.com:666/meow")
- expect(response.text).to.equal('https://asdf.com:666/meow')
- expect(HTTPretty.last_request.method).to.equal('POST')
- expect(HTTPretty.last_request.path).to.equal('/meow')
+ expect(response.text).to.equal("https://asdf.com:666/meow")
+ expect(HTTPretty.last_request.method).to.equal("POST")
+ expect(HTTPretty.last_request.path).to.equal("/meow")
@httprettified
@@ -871,28 +915,28 @@ def test_httpretty_reset_by_switching_protocols_for_same_port():
HTTPretty.register_uri(
HTTPretty.GET,
"http://api.yipit.com:1234/v1/deal",
- body=lambda method, uri, headers: [200, headers, uri]
+ body=lambda method, uri, headers: [200, headers, uri],
)
- response = requests.get('http://api.yipit.com:1234/v1/deal')
+ response = requests.get("http://api.yipit.com:1234/v1/deal")
- expect(response.text).to.equal('http://api.yipit.com:1234/v1/deal')
- expect(HTTPretty.last_request.method).to.equal('GET')
- expect(HTTPretty.last_request.path).to.equal('/v1/deal')
+ expect(response.text).to.equal("http://api.yipit.com:1234/v1/deal")
+ expect(HTTPretty.last_request.method).to.equal("GET")
+ expect(HTTPretty.last_request.path).to.equal("/v1/deal")
HTTPretty.reset()
HTTPretty.register_uri(
HTTPretty.GET,
"https://api.yipit.com:1234/v1/deal",
- body=lambda method, uri, headers: [200, headers, uri]
+ body=lambda method, uri, headers: [200, headers, uri],
)
- response = requests.get('https://api.yipit.com:1234/v1/deal')
+ response = requests.get("https://api.yipit.com:1234/v1/deal")
- expect(response.text).to.equal('https://api.yipit.com:1234/v1/deal')
- expect(HTTPretty.last_request.method).to.equal('GET')
- expect(HTTPretty.last_request.path).to.equal('/v1/deal')
+ expect(response.text).to.equal("https://api.yipit.com:1234/v1/deal")
+ expect(HTTPretty.last_request.method).to.equal("GET")
+ expect(HTTPretty.last_request.path).to.equal("/v1/deal")
@httprettified
@@ -902,11 +946,17 @@ def test_httpretty_should_allow_registering_regexes_with_port_and_give_a_proper_
HTTPretty.register_uri(
HTTPretty.GET,
re.compile(r"https://api.yipit.com:1234/v1/deal;brand=(?P<brand_name>\w+)"),
- body=lambda method, uri, headers: [200, headers, uri]
+ body=lambda method, uri, headers: [200, headers, uri],
)
- response = requests.get('https://api.yipit.com:1234/v1/deal;brand=gap?first_name=chuck&last_name=norris')
+ response = requests.get(
+ "https://api.yipit.com:1234/v1/deal;brand=gap?first_name=chuck&last_name=norris"
+ )
- expect(response.text).to.equal('https://api.yipit.com:1234/v1/deal;brand=gap?first_name=chuck&last_name=norris')
- expect(HTTPretty.last_request.method).to.equal('GET')
- expect(HTTPretty.last_request.path).to.equal('/v1/deal;brand=gap?first_name=chuck&last_name=norris')
+ expect(response.text).to.equal(
+ "https://api.yipit.com:1234/v1/deal;brand=gap?first_name=chuck&last_name=norris"
+ )
+ expect(HTTPretty.last_request.method).to.equal("GET")
+ expect(HTTPretty.last_request.path).to.equal(
+ "/v1/deal;brand=gap?first_name=chuck&last_name=norris"
+ )
diff --git a/tests/functional/test_urllib2.py b/tests/functional/test_urllib2.py
index c2a1732..a101487 100644
--- a/tests/functional/test_urllib2.py
+++ b/tests/functional/test_urllib2.py
@@ -27,11 +27,13 @@
from __future__ import unicode_literals
import re
+
try:
from urllib.request import urlopen
import urllib.request as urllib2
except ImportError:
import urllib2
+
urlopen = urllib2.urlopen
from sure import within, microseconds
@@ -44,14 +46,15 @@ from httpretty.core import decode_utf8
def test_httpretty_should_mock_a_simple_get_with_urllib2_read():
"HTTPretty should mock a simple GET with urllib2.read()"
- HTTPretty.register_uri(HTTPretty.GET, "http://yipit.com/",
- body="Find the best daily deals")
+ HTTPretty.register_uri(
+ HTTPretty.GET, "http://yipit.com/", body="Find the best daily deals"
+ )
- fd = urlopen('http://yipit.com')
+ fd = urlopen("http://yipit.com")
got = fd.read()
fd.close()
- got.should.equal(b'Find the best daily deals')
+ got.should.equal(b"Find the best daily deals")
@httprettified
@@ -59,17 +62,17 @@ def test_httpretty_should_mock_a_simple_get_with_urllib2_read():
def test_httpretty_provides_easy_access_to_querystrings(now):
"HTTPretty should provide an easy access to the querystring"
- HTTPretty.register_uri(HTTPretty.GET, "http://yipit.com/",
- body="Find the best daily deals")
+ HTTPretty.register_uri(
+ HTTPretty.GET, "http://yipit.com/", body="Find the best daily deals"
+ )
- fd = urllib2.urlopen('http://yipit.com/?foo=bar&foo=baz&chuck=norris')
+ fd = urllib2.urlopen("http://yipit.com/?foo=bar&foo=baz&chuck=norris")
fd.read()
fd.close()
- HTTPretty.last_request.querystring.should.equal({
- 'foo': ['bar', 'baz'],
- 'chuck': ['norris'],
- })
+ HTTPretty.last_request.querystring.should.equal(
+ {"foo": ["bar", "baz"], "chuck": ["norris"]}
+ )
@httprettified
@@ -77,24 +80,29 @@ def test_httpretty_provides_easy_access_to_querystrings(now):
def test_httpretty_should_mock_headers_urllib2(now):
"HTTPretty should mock basic headers with urllib2"
- HTTPretty.register_uri(HTTPretty.GET, "http://github.com/",
- body="this is supposed to be the response",
- status=201)
+ HTTPretty.register_uri(
+ HTTPretty.GET,
+ "http://github.com/",
+ body="this is supposed to be the response",
+ status=201,
+ )
- request = urlopen('http://github.com')
+ request = urlopen("http://github.com")
headers = dict(request.headers)
request.close()
request.code.should.equal(201)
- headers.should.equal({
- 'content-type': 'text/plain; charset=utf-8',
- 'connection': 'close',
- 'content-length': '35',
- 'status': '201',
- 'server': 'Python/HTTPretty',
- 'date': now.strftime('%a, %d %b %Y %H:%M:%S GMT'),
- })
+ headers.should.equal(
+ {
+ "content-type": "text/plain; charset=utf-8",
+ "connection": "close",
+ "content-length": "35",
+ "status": "201",
+ "server": "Python/HTTPretty",
+ "date": now.strftime("%a, %d %b %Y %H:%M:%S GMT"),
+ }
+ )
@httprettified
@@ -102,27 +110,32 @@ def test_httpretty_should_mock_headers_urllib2(now):
def test_httpretty_should_allow_adding_and_overwritting_urllib2(now):
"HTTPretty should allow adding and overwritting headers with urllib2"
- HTTPretty.register_uri(HTTPretty.GET, "http://github.com/",
- body="this is supposed to be the response",
- adding_headers={
- 'Server': 'Apache',
- 'Content-Length': '27',
- 'Content-Type': 'application/json',
- })
+ HTTPretty.register_uri(
+ HTTPretty.GET,
+ "http://github.com/",
+ body="this is supposed to be the response",
+ adding_headers={
+ "Server": "Apache",
+ "Content-Length": "27",
+ "Content-Type": "application/json",
+ },
+ )
- request = urlopen('http://github.com')
+ request = urlopen("http://github.com")
headers = dict(request.headers)
request.close()
request.code.should.equal(200)
- headers.should.equal({
- 'content-type': 'application/json',
- 'connection': 'close',
- 'content-length': '27',
- 'status': '200',
- 'server': 'Apache',
- 'date': now.strftime('%a, %d %b %Y %H:%M:%S GMT'),
- })
+ headers.should.equal(
+ {
+ "content-type": "application/json",
+ "connection": "close",
+ "content-length": "27",
+ "status": "200",
+ "server": "Apache",
+ "date": now.strftime("%a, %d %b %Y %H:%M:%S GMT"),
+ }
+ )
@httprettified
@@ -130,82 +143,87 @@ def test_httpretty_should_allow_adding_and_overwritting_urllib2(now):
def test_httpretty_should_allow_forcing_headers_urllib2():
"HTTPretty should allow forcing headers with urllib2"
- HTTPretty.register_uri(HTTPretty.GET, "http://github.com/",
- body="this is supposed to be the response",
- forcing_headers={
- 'Content-Type': 'application/xml',
- 'Content-Length': '35a',
- })
+ HTTPretty.register_uri(
+ HTTPretty.GET,
+ "http://github.com/",
+ body="this is supposed to be the response",
+ forcing_headers={"Content-Type": "application/xml", "Content-Length": "35a"},
+ )
- request = urlopen('http://github.com')
+ request = urlopen("http://github.com")
headers = dict(request.headers)
request.close()
- headers.should.equal({
- 'content-type': 'application/xml',
- 'content-length': '35a',
- })
+ headers.should.equal({"content-type": "application/xml", "content-length": "35a"})
@httprettified
@within(two=microseconds)
def test_httpretty_should_allow_adding_and_overwritting_by_kwargs_u2(now):
- ("HTTPretty should allow adding and overwritting headers by "
- "keyword args with urllib2")
+ (
+ "HTTPretty should allow adding and overwritting headers by "
+ "keyword args with urllib2"
+ )
body = "this is supposed to be the response, indeed"
- HTTPretty.register_uri(HTTPretty.GET, "http://github.com/",
- body=body,
- server='Apache',
- content_length=len(body),
- content_type='application/json')
+ HTTPretty.register_uri(
+ HTTPretty.GET,
+ "http://github.com/",
+ body=body,
+ server="Apache",
+ content_length=len(body),
+ content_type="application/json",
+ )
- request = urlopen('http://github.com')
+ request = urlopen("http://github.com")
headers = dict(request.headers)
request.close()
request.code.should.equal(200)
- headers.should.equal({
- 'content-type': 'application/json',
- 'connection': 'close',
- 'content-length': str(len(body)),
- 'status': '200',
- 'server': 'Apache',
- 'date': now.strftime('%a, %d %b %Y %H:%M:%S GMT'),
- })
+ headers.should.equal(
+ {
+ "content-type": "application/json",
+ "connection": "close",
+ "content-length": str(len(body)),
+ "status": "200",
+ "server": "Apache",
+ "date": now.strftime("%a, %d %b %Y %H:%M:%S GMT"),
+ }
+ )
@httprettified
@within(two=microseconds)
def test_httpretty_should_support_a_list_of_successive_responses_urllib2(now):
- ("HTTPretty should support adding a list of successive "
- "responses with urllib2")
+ ("HTTPretty should support adding a list of successive " "responses with urllib2")
HTTPretty.register_uri(
- HTTPretty.GET, "https://api.yahoo.com/test",
+ HTTPretty.GET,
+ "https://api.yahoo.com/test",
responses=[
HTTPretty.Response(body="first response", status=201),
- HTTPretty.Response(body='second and last response', status=202),
- ])
+ HTTPretty.Response(body="second and last response", status=202),
+ ],
+ )
- request1 = urlopen('https://api.yahoo.com/test')
+ request1 = urlopen("https://api.yahoo.com/test")
body1 = request1.read()
request1.close()
request1.code.should.equal(201)
- body1.should.equal(b'first response')
+ body1.should.equal(b"first response")
- request2 = urlopen('https://api.yahoo.com/test')
+ request2 = urlopen("https://api.yahoo.com/test")
body2 = request2.read()
request2.close()
request2.code.should.equal(202)
- body2.should.equal(b'second and last response')
+ body2.should.equal(b"second and last response")
- request3 = urlopen('https://api.yahoo.com/test')
+ request3 = urlopen("https://api.yahoo.com/test")
body3 = request3.read()
request3.close()
request3.code.should.equal(202)
- body3.should.equal(b'second and last response')
+ body3.should.equal(b"second and last response")
@httprettified
@@ -213,27 +231,24 @@ def test_httpretty_should_support_a_list_of_successive_responses_urllib2(now):
def test_can_inspect_last_request(now):
"HTTPretty.last_request is a mimetools.Message request from last match"
- HTTPretty.register_uri(HTTPretty.POST, "http://api.github.com/",
- body='{"repositories": ["HTTPretty", "lettuce"]}')
+ HTTPretty.register_uri(
+ HTTPretty.POST,
+ "http://api.github.com/",
+ body='{"repositories": ["HTTPretty", "lettuce"]}',
+ )
request = urllib2.Request(
- 'http://api.github.com',
+ "http://api.github.com",
b'{"username": "gabrielfalcao"}',
- {
- 'content-type': 'text/json',
- },
+ {"content-type": "text/json"},
)
fd = urlopen(request)
got = fd.read()
fd.close()
- HTTPretty.last_request.method.should.equal('POST')
- HTTPretty.last_request.body.should.equal(
- b'{"username": "gabrielfalcao"}',
- )
- HTTPretty.last_request.headers['content-type'].should.equal(
- 'text/json',
- )
+ HTTPretty.last_request.method.should.equal("POST")
+ HTTPretty.last_request.body.should.equal(b'{"username": "gabrielfalcao"}')
+ HTTPretty.last_request.headers["content-type"].should.equal("text/json")
got.should.equal(b'{"repositories": ["HTTPretty", "lettuce"]}')
@@ -242,27 +257,24 @@ def test_can_inspect_last_request(now):
def test_can_inspect_last_request_with_ssl(now):
"HTTPretty.last_request is recorded even when mocking 'https' (SSL)"
- HTTPretty.register_uri(HTTPretty.POST, "https://secure.github.com/",
- body='{"repositories": ["HTTPretty", "lettuce"]}')
+ HTTPretty.register_uri(
+ HTTPretty.POST,
+ "https://secure.github.com/",
+ body='{"repositories": ["HTTPretty", "lettuce"]}',
+ )
request = urllib2.Request(
- 'https://secure.github.com',
+ "https://secure.github.com",
b'{"username": "gabrielfalcao"}',
- {
- 'content-type': 'text/json',
- },
+ {"content-type": "text/json"},
)
fd = urlopen(request)
got = fd.read()
fd.close()
- HTTPretty.last_request.method.should.equal('POST')
- HTTPretty.last_request.body.should.equal(
- b'{"username": "gabrielfalcao"}',
- )
- HTTPretty.last_request.headers['content-type'].should.equal(
- 'text/json',
- )
+ HTTPretty.last_request.method.should.equal("POST")
+ HTTPretty.last_request.body.should.equal(b'{"username": "gabrielfalcao"}')
+ HTTPretty.last_request.headers["content-type"].should.equal("text/json")
got.should.equal(b'{"repositories": ["HTTPretty", "lettuce"]}')
@@ -271,47 +283,49 @@ def test_can_inspect_last_request_with_ssl(now):
def test_httpretty_ignores_querystrings_from_registered_uri():
"HTTPretty should mock a simple GET with urllib2.read()"
- HTTPretty.register_uri(HTTPretty.GET, "http://yipit.com/?id=123",
- body="Find the best daily deals")
+ HTTPretty.register_uri(
+ HTTPretty.GET, "http://yipit.com/?id=123", body="Find the best daily deals"
+ )
- fd = urlopen('http://yipit.com/?id=123')
+ fd = urlopen("http://yipit.com/?id=123")
got = fd.read()
fd.close()
- got.should.equal(b'Find the best daily deals')
- HTTPretty.last_request.method.should.equal('GET')
- HTTPretty.last_request.path.should.equal('/?id=123')
+ got.should.equal(b"Find the best daily deals")
+ HTTPretty.last_request.method.should.equal("GET")
+ HTTPretty.last_request.path.should.equal("/?id=123")
@httprettified
@within(two=microseconds)
def test_callback_response(now):
- ("HTTPretty should all a callback function to be set as the body with"
- " urllib2")
+ ("HTTPretty should all a callback function to be set as the body with" " urllib2")
def request_callback(request, uri, headers):
- return [200, headers, "The {} response from {}".format(decode_utf8(request.method), uri)]
+ return [
+ 200,
+ headers,
+ "The {} response from {}".format(decode_utf8(request.method), uri),
+ ]
HTTPretty.register_uri(
- HTTPretty.GET, "https://api.yahoo.com/test",
- body=request_callback)
+ HTTPretty.GET, "https://api.yahoo.com/test", body=request_callback
+ )
- fd = urllib2.urlopen('https://api.yahoo.com/test')
+ fd = urllib2.urlopen("https://api.yahoo.com/test")
got = fd.read()
fd.close()
got.should.equal(b"The GET response from https://api.yahoo.com/test")
HTTPretty.register_uri(
- HTTPretty.POST, "https://api.yahoo.com/test_post",
- body=request_callback)
+ HTTPretty.POST, "https://api.yahoo.com/test_post", body=request_callback
+ )
request = urllib2.Request(
"https://api.yahoo.com/test_post",
b'{"username": "gabrielfalcao"}',
- {
- 'content-type': 'text/json',
- },
+ {"content-type": "text/json"},
)
fd = urllib2.urlopen(request)
got = fd.read()
@@ -330,9 +344,7 @@ def test_httpretty_should_allow_registering_regexes():
body="Found brand",
)
- request = urllib2.Request(
- "https://api.yipit.com/v1/deal;brand=GAP",
- )
+ request = urllib2.Request("https://api.yipit.com/v1/deal;brand=GAP")
fd = urllib2.urlopen(request)
got = fd.read()
fd.close()
diff --git a/tests/functional/testserver.py b/tests/functional/testserver.py
index 38d2ce4..0a874ea 100644
--- a/tests/functional/testserver.py
+++ b/tests/functional/testserver.py
@@ -36,6 +36,7 @@ from tornado.httpserver import HTTPServer
from tornado.ioloop import IOLoop
from httpretty import HTTPretty
from httpretty.core import old_socket as true_socket
+
# from httpretty.compat import PY3
from httpretty.compat import binary_type
from httpretty.compat import text_type
@@ -44,10 +45,11 @@ from multiprocessing import Process
def utf8(s):
if isinstance(s, text_type):
- s = s.encode('utf-8')
+ s = s.encode("utf-8")
return binary_type(s)
+
# if not PY3:
# bytes = lambda s, *args: str(s)
@@ -71,14 +73,14 @@ class TornadoServer(object):
@classmethod
def get_handlers(cls):
- return Application([
- (r"/go-for-bubbles/?", BubblesHandler),
- (r"/come-again/?", ComeHandler),
- ])
+ return Application(
+ [(r"/go-for-bubbles/?", BubblesHandler), (r"/come-again/?", ComeHandler)]
+ )
def start(self):
def go(app, port, data={}):
from httpretty import HTTPretty
+
HTTPretty.disable()
http = HTTPServer(app)
@@ -114,10 +116,12 @@ class TCPServer(object):
def go(port):
from httpretty import HTTPretty
+
HTTPretty.disable()
import socket
+
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- s.bind(('localhost', port))
+ s.bind(("localhost", port))
s.listen(True)
conn, addr = s.accept()
@@ -145,11 +149,11 @@ class TCPClient(object):
def __init__(self, port):
self.port = int(port)
self.sock = true_socket(socket.AF_INET, socket.SOCK_STREAM)
- self.sock.connect(('localhost', self.port))
+ self.sock.connect(("localhost", self.port))
def send(self, data):
if isinstance(data, text_type):
- data = data.encode('utf-8')
+ data = data.encode("utf-8")
self.sock.sendall(data)
return self.sock.recv(len(data) + 11)
diff --git a/tests/pyopenssl/__init__.py b/tests/pyopenssl/__init__.py
index 4eaed8c..ad1391a 100644
--- a/tests/pyopenssl/__init__.py
+++ b/tests/pyopenssl/__init__.py
@@ -25,4 +25,5 @@
# OTHER DEALINGS IN THE SOFTWARE.
import warnings
-warnings.simplefilter('ignore')
+
+warnings.simplefilter("ignore")
diff --git a/tests/pyopenssl/test_mock.py b/tests/pyopenssl/test_mock.py
index 89320f2..54572fa 100644
--- a/tests/pyopenssl/test_mock.py
+++ b/tests/pyopenssl/test_mock.py
@@ -34,12 +34,13 @@ from sure import expect
@httprettified
def test_httpretty_overrides_when_pyopenssl_installed():
- ('HTTPretty should remove PyOpenSSLs urllib3 mock if it is installed')
+ ("HTTPretty should remove PyOpenSSLs urllib3 mock if it is installed")
# And HTTPretty works successfully
- HTTPretty.register_uri(HTTPretty.GET, "https://yipit.com/",
- body="Find the best daily deals")
+ HTTPretty.register_uri(
+ HTTPretty.GET, "https://yipit.com/", body="Find the best daily deals"
+ )
- response = requests.get('https://yipit.com')
- expect(response.text).to.equal('Find the best daily deals')
- expect(HTTPretty.last_request.method).to.equal('GET')
- expect(HTTPretty.last_request.path).to.equal('/')
+ response = requests.get("https://yipit.com")
+ expect(response.text).to.equal("Find the best daily deals")
+ expect(HTTPretty.last_request.method).to.equal("GET")
+ expect(HTTPretty.last_request.path).to.equal("/")
diff --git a/tests/unit/test_core.py b/tests/unit/test_core.py
index 450b021..0758b73 100644
--- a/tests/unit/test_core.py
+++ b/tests/unit/test_core.py
@@ -19,41 +19,47 @@ class SocketErrorStub(Exception):
def test_request_stubs_internals():
- ("HTTPrettyRequest is a BaseHTTPRequestHandler that replaces "
- "real socket file descriptors with in-memory ones")
+ (
+ "HTTPrettyRequest is a BaseHTTPRequestHandler that replaces "
+ "real socket file descriptors with in-memory ones"
+ )
# Given a valid HTTP request header string
- headers = "\r\n".join([
- 'POST /somewhere/?name=foo&age=bar HTTP/1.1',
- 'accept-encoding: identity',
- 'host: github.com',
- 'content-type: application/json',
- 'connection: close',
- 'user-agent: Python-urllib/2.7',
- ])
+ headers = "\r\n".join(
+ [
+ "POST /somewhere/?name=foo&age=bar HTTP/1.1",
+ "accept-encoding: identity",
+ "host: github.com",
+ "content-type: application/json",
+ "connection: close",
+ "user-agent: Python-urllib/2.7",
+ ]
+ )
# When I create a HTTPrettyRequest with an empty body
- request = HTTPrettyRequest(headers, body='')
+ request = HTTPrettyRequest(headers, body="")
# Then it should have parsed the headers
- dict(request.headers).should.equal({
- 'accept-encoding': 'identity',
- 'connection': 'close',
- 'content-type': 'application/json',
- 'host': 'github.com',
- 'user-agent': 'Python-urllib/2.7'
- })
+ dict(request.headers).should.equal(
+ {
+ "accept-encoding": "identity",
+ "connection": "close",
+ "content-type": "application/json",
+ "host": "github.com",
+ "user-agent": "Python-urllib/2.7",
+ }
+ )
# And the `rfile` should be a StringIO
- type_as_str = StringIO.__module__ + '.' + StringIO.__name__
+ type_as_str = StringIO.__module__ + "." + StringIO.__name__
- request.should.have.property('rfile').being.a(type_as_str)
+ request.should.have.property("rfile").being.a(type_as_str)
# And the `wfile` should be a StringIO
- request.should.have.property('wfile').being.a(type_as_str)
+ request.should.have.property("wfile").being.a(type_as_str)
# And the `method` should be available
- request.should.have.property('method').being.equal('POST')
+ request.should.have.property("method").being.equal("POST")
def test_request_parse_querystring():
@@ -61,88 +67,92 @@ def test_request_parse_querystring():
# Given a request string containing a unicode encoded querystring
- headers = "\r\n".join([
- 'POST /create?name=Gabriel+Falcão HTTP/1.1',
- 'Content-Type: multipart/form-data',
- ])
+ headers = "\r\n".join(
+ [
+ "POST /create?name=Gabriel+Falcão HTTP/1.1",
+ "Content-Type: multipart/form-data",
+ ]
+ )
# When I create a HTTPrettyRequest with an empty body
- request = HTTPrettyRequest(headers, body='')
+ request = HTTPrettyRequest(headers, body="")
# Then it should have a parsed querystring
- request.querystring.should.equal({'name': ['Gabriel Falcão']})
+ request.querystring.should.equal({"name": ["Gabriel Falcão"]})
def test_request_parse_body_when_it_is_application_json():
- ("HTTPrettyRequest#parse_request_body recognizes the "
- "content-type `application/json` and parses it")
+ (
+ "HTTPrettyRequest#parse_request_body recognizes the "
+ "content-type `application/json` and parses it"
+ )
# Given a request string containing a unicode encoded querystring
- headers = "\r\n".join([
- 'POST /create HTTP/1.1',
- 'Content-Type: application/json',
- ])
+ headers = "\r\n".join(["POST /create HTTP/1.1", "Content-Type: application/json"])
# And a valid json body
- body = json.dumps({'name': 'Gabriel Falcão'})
+ body = json.dumps({"name": "Gabriel Falcão"})
# When I create a HTTPrettyRequest with that data
request = HTTPrettyRequest(headers, body)
# Then it should have a parsed body
- request.parsed_body.should.equal({'name': 'Gabriel Falcão'})
+ request.parsed_body.should.equal({"name": "Gabriel Falcão"})
def test_request_parse_body_when_it_is_text_json():
- ("HTTPrettyRequest#parse_request_body recognizes the "
- "content-type `text/json` and parses it")
+ (
+ "HTTPrettyRequest#parse_request_body recognizes the "
+ "content-type `text/json` and parses it"
+ )
# Given a request string containing a unicode encoded querystring
- headers = "\r\n".join([
- 'POST /create HTTP/1.1',
- 'Content-Type: text/json',
- ])
+ headers = "\r\n".join(["POST /create HTTP/1.1", "Content-Type: text/json"])
# And a valid json body
- body = json.dumps({'name': 'Gabriel Falcão'})
+ body = json.dumps({"name": "Gabriel Falcão"})
# When I create a HTTPrettyRequest with that data
request = HTTPrettyRequest(headers, body)
# Then it should have a parsed body
- request.parsed_body.should.equal({'name': 'Gabriel Falcão'})
+ request.parsed_body.should.equal({"name": "Gabriel Falcão"})
def test_request_parse_body_when_it_is_urlencoded():
- ("HTTPrettyRequest#parse_request_body recognizes the "
- "content-type `application/x-www-form-urlencoded` and parses it")
+ (
+ "HTTPrettyRequest#parse_request_body recognizes the "
+ "content-type `application/x-www-form-urlencoded` and parses it"
+ )
# Given a request string containing a unicode encoded querystring
- headers = "\r\n".join([
- 'POST /create HTTP/1.1',
- 'Content-Type: application/x-www-form-urlencoded',
- ])
+ headers = "\r\n".join(
+ ["POST /create HTTP/1.1", "Content-Type: application/x-www-form-urlencoded"]
+ )
# And a valid urlencoded body
- body = "name=Gabriel+Falcão&age=25&projects=httpretty&projects=sure&projects=lettuce"
+ body = (
+ "name=Gabriel+Falcão&age=25&projects=httpretty&projects=sure&projects=lettuce"
+ )
# When I create a HTTPrettyRequest with that data
request = HTTPrettyRequest(headers, body)
# Then it should have a parsed body
- request.parsed_body.should.equal({
- 'name': ['Gabriel Falcão'],
- 'age': ["25"],
- 'projects': ["httpretty", "sure", "lettuce"]
- })
+ request.parsed_body.should.equal(
+ {
+ "name": ["Gabriel Falcão"],
+ "age": ["25"],
+ "projects": ["httpretty", "sure", "lettuce"],
+ }
+ )
def test_request_parse_body_when_unrecognized():
- ("HTTPrettyRequest#parse_request_body returns the value as "
- "is if the Content-Type is not recognized")
+ (
+ "HTTPrettyRequest#parse_request_body returns the value as "
+ "is if the Content-Type is not recognized"
+ )
# Given a request string containing a unicode encoded querystring
- headers = "\r\n".join([
- 'POST /create HTTP/1.1',
- 'Content-Type: whatever',
- ])
+ headers = "\r\n".join(["POST /create HTTP/1.1", "Content-Type: whatever"])
# And a valid urlencoded body
body = "foobar:\nlalala"
@@ -154,14 +164,10 @@ def test_request_parse_body_when_unrecognized():
def test_request_string_representation():
- ("HTTPrettyRequest should have a debug-friendly "
- "string representation")
+ ("HTTPrettyRequest should have a debug-friendly " "string representation")
# Given a request string containing a unicode encoded querystring
- headers = "\r\n".join([
- 'POST /create HTTP/1.1',
- 'Content-Type: JPEG-baby',
- ])
+ headers = "\r\n".join(["POST /create HTTP/1.1", "Content-Type: JPEG-baby"])
# And a valid urlencoded body
body = "foobar:\nlalala"
@@ -169,12 +175,16 @@ def test_request_string_representation():
request = HTTPrettyRequest(headers, body)
# Then its string representation should show the headers and the body
- str(request).should.equal('<HTTPrettyRequest("JPEG-baby", total_headers=1, body_length=14)>')
+ str(request).should.equal(
+ '<HTTPrettyRequest("JPEG-baby", total_headers=1, body_length=14)>'
+ )
def test_fake_ssl_socket_proxies_its_ow_socket():
- ("FakeSSLSocket is a simpel wrapper around its own socket, "
- "which was designed to be a HTTPretty fake socket")
+ (
+ "FakeSSLSocket is a simpel wrapper around its own socket, "
+ "which was designed to be a HTTPretty fake socket"
+ )
# Given a sentinel mock object
socket = Mock()
@@ -189,7 +199,7 @@ def test_fake_ssl_socket_proxies_its_ow_socket():
socket.send.assert_called_once_with("FOO")
-@patch('httpretty.core.datetime')
+@patch("httpretty.core.datetime")
def test_fakesock_socket_getpeercert(dt):
("fakesock.socket#getpeercert should return a hardcoded fake certificate")
# Background:
@@ -199,24 +209,27 @@ def test_fakesock_socket_getpeercert(dt):
socket = fakesock.socket()
# And that it's bound to some host
- socket._host = 'somewhere.com'
+ socket._host = "somewhere.com"
# When I retrieve the peer certificate
certificate = socket.getpeercert()
# Then it should return a hardcoded value
- certificate.should.equal({
- u'notAfter': 'Sep 29 04:20:00 GMT',
- u'subject': (
- ((u'organizationName', u'*.somewhere.com'),),
- ((u'organizationalUnitName', u'Domain Control Validated'),),
- ((u'commonName', u'*.somewhere.com'),)),
- u'subjectAltName': (
- (u'DNS', u'*.somewhere.com'),
- (u'DNS', u'somewhere.com'),
- (u'DNS', u'*')
- )
- })
+ certificate.should.equal(
+ {
+ "notAfter": "Sep 29 04:20:00 GMT",
+ "subject": (
+ (("organizationName", "*.somewhere.com"),),
+ (("organizationalUnitName", "Domain Control Validated"),),
+ (("commonName", "*.somewhere.com"),),
+ ),
+ "subjectAltName": (
+ ("DNS", "*.somewhere.com"),
+ ("DNS", "somewhere.com"),
+ ("DNS", "*"),
+ ),
+ }
+ )
def test_fakesock_socket_ssl():
@@ -234,31 +247,38 @@ def test_fakesock_socket_ssl():
result.should.equal(sentinel)
-@patch('httpretty.core.old_socket')
-@patch('httpretty.core.POTENTIAL_HTTP_PORTS')
+@patch("httpretty.core.old_socket")
+@patch("httpretty.core.POTENTIAL_HTTP_PORTS")
def test_fakesock_socket_connect_fallback(POTENTIAL_HTTP_PORTS, old_socket):
- ("fakesock.socket#connect should open a real connection if the "
- "given port is not a potential http port")
+ (
+ "fakesock.socket#connect should open a real connection if the "
+ "given port is not a potential http port"
+ )
# Background: the potential http ports are 80 and 443
- POTENTIAL_HTTP_PORTS.__contains__.side_effect = lambda other: int(other) in (80, 443)
+ POTENTIAL_HTTP_PORTS.__contains__.side_effect = lambda other: int(other) in (
+ 80,
+ 443,
+ )
# Given a fake socket instance
socket = fakesock.socket()
# When it is connected to a remote server in a port that isn't 80 nor 443
- socket.connect(('somewhere.com', 42))
+ socket.connect(("somewhere.com", 42))
# Then it should have open a real connection in the background
- old_socket.return_value.connect.assert_called_once_with(('somewhere.com', 42))
+ old_socket.return_value.connect.assert_called_once_with(("somewhere.com", 42))
# And _closed is set to False
socket._closed.should.be.false
-@patch('httpretty.core.old_socket')
+@patch("httpretty.core.old_socket")
def test_fakesock_socket_close(old_socket):
- ("fakesock.socket#close should close the actual socket in case "
- "it's not http and _closed is False")
+ (
+ "fakesock.socket#close should close the actual socket in case "
+ "it's not http and _closed is False"
+ )
# Given a fake socket instance that is synthetically open
socket = fakesock.socket()
socket._closed = False
@@ -274,22 +294,24 @@ def test_fakesock_socket_close(old_socket):
socket._closed.should.be.true
-@patch('httpretty.core.old_socket')
+@patch("httpretty.core.old_socket")
def test_fakesock_socket_makefile(old_socket):
- ("fakesock.socket#makefile should set the mode, "
- "bufsize and return its mocked file descriptor")
+ (
+ "fakesock.socket#makefile should set the mode, "
+ "bufsize and return its mocked file descriptor"
+ )
# Given a fake socket that has a mocked Entry associated with it
socket = fakesock.socket()
socket._entry = Mock()
# When I call makefile()
- fd = socket.makefile(mode='rw', bufsize=512)
+ fd = socket.makefile(mode="rw", bufsize=512)
# Then it should have returned the socket's own filedescriptor
expect(fd).to.equal(socket.fd)
# And the mode should have been set in the socket instance
- socket._mode.should.equal('rw')
+ socket._mode.should.equal("rw")
# And the bufsize should have been set in the socket instance
socket._bufsize.should.equal(512)
@@ -297,23 +319,27 @@ def test_fakesock_socket_makefile(old_socket):
socket._entry.fill_filekind.assert_called_once_with(fd)
-@patch('httpretty.core.old_socket')
+@patch("httpretty.core.old_socket")
def test_fakesock_socket_real_sendall(old_socket):
- ("fakesock.socket#real_sendall calls truesock#connect and bails "
- "out when not http")
+ (
+ "fakesock.socket#real_sendall calls truesock#connect and bails "
+ "out when not http"
+ )
# Background: the real socket will stop returning bytes after the
# first call
real_socket = old_socket.return_value
- real_socket.recv.side_effect = [b'response from server', b""]
+ real_socket.recv.side_effect = [b"response from server", b""]
# Given a fake socket
socket = fakesock.socket()
# When I call real_sendall with data, some args and kwargs
- socket.real_sendall(b"SOMEDATA", b'some extra args...', foo=b'bar')
+ socket.real_sendall(b"SOMEDATA", b"some extra args...", foo=b"bar")
# Then it should have called sendall in the real socket
- real_socket.sendall.assert_called_once_with(b"SOMEDATA", b'some extra args...', foo=b'bar')
+ real_socket.sendall.assert_called_once_with(
+ b"SOMEDATA", b"some extra args...", foo=b"bar"
+ )
# And setblocking was never called
real_socket.setblocking.called.should.be.false
@@ -322,38 +348,40 @@ def test_fakesock_socket_real_sendall(old_socket):
real_socket.recv.called.should.be.false
# And the buffer is empty
- socket.fd.read().should.equal(b'')
+ socket.fd.read().should.equal(b"")
# And connect was never called
real_socket.connect.called.should.be.false
-@patch('httpretty.core.old_socket')
+@patch("httpretty.core.old_socket")
def test_fakesock_socket_real_sendall_when_http(old_socket):
- ("fakesock.socket#real_sendall sends data and buffers "
- "the response in the file descriptor")
+ (
+ "fakesock.socket#real_sendall sends data and buffers "
+ "the response in the file descriptor"
+ )
# Background: the real socket will stop returning bytes after the
# first call
real_socket = old_socket.return_value
- real_socket.recv.side_effect = [b'response from server', b""]
+ real_socket.recv.side_effect = [b"response from server", b""]
# Given a fake socket
socket = fakesock.socket()
socket.is_http = True
# When I call real_sendall with data, some args and kwargs
- socket.real_sendall(b"SOMEDATA", b'some extra args...', foo=b'bar')
+ socket.real_sendall(b"SOMEDATA", b"some extra args...", foo=b"bar")
# Then it should have called sendall in the real socket
- real_socket.sendall.assert_called_once_with(b"SOMEDATA", b'some extra args...', foo=b'bar')
+ real_socket.sendall.assert_called_once_with(
+ b"SOMEDATA", b"some extra args...", foo=b"bar"
+ )
# And the socket was set to blocking
real_socket.setblocking.assert_called_once_with(1)
# And recv was called with the bufsize
- real_socket.recv.assert_has_calls([
- call(socket._bufsize)
- ])
+ real_socket.recv.assert_has_calls([call(socket._bufsize)])
# And the buffer should contain the data from the server
socket.fd.read().should.equal(b"response from server")
@@ -362,33 +390,33 @@ def test_fakesock_socket_real_sendall_when_http(old_socket):
real_socket.connect.called.should.be.true
-@patch('httpretty.core.old_socket')
-@patch('httpretty.core.socket')
+@patch("httpretty.core.old_socket")
+@patch("httpretty.core.socket")
def test_fakesock_socket_real_sendall_continue_eagain_when_http(socket, old_socket):
("fakesock.socket#real_sendall should continue if the socket error was EAGAIN")
socket.error = SocketErrorStub
# Background: the real socket will stop returning bytes after the
# first call
real_socket = old_socket.return_value
- real_socket.recv.side_effect = [SocketErrorStub(errno.EAGAIN), b'after error', b""]
+ real_socket.recv.side_effect = [SocketErrorStub(errno.EAGAIN), b"after error", b""]
# Given a fake socket
socket = fakesock.socket()
socket.is_http = True
# When I call real_sendall with data, some args and kwargs
- socket.real_sendall(b"SOMEDATA", b'some extra args...', foo=b'bar')
+ socket.real_sendall(b"SOMEDATA", b"some extra args...", foo=b"bar")
# Then it should have called sendall in the real socket
- real_socket.sendall.assert_called_once_with(b"SOMEDATA", b'some extra args...', foo=b'bar')
+ real_socket.sendall.assert_called_once_with(
+ b"SOMEDATA", b"some extra args...", foo=b"bar"
+ )
# And the socket was set to blocking
real_socket.setblocking.assert_called_once_with(1)
# And recv was called with the bufsize
- real_socket.recv.assert_has_calls([
- call(socket._bufsize)
- ])
+ real_socket.recv.assert_has_calls([call(socket._bufsize)])
# And the buffer should contain the data from the server
socket.fd.read().should.equal(b"after error")
@@ -397,25 +425,27 @@ def test_fakesock_socket_real_sendall_continue_eagain_when_http(socket, old_sock
real_socket.connect.called.should.be.true
-@patch('httpretty.core.old_socket')
-@patch('httpretty.core.socket')
+@patch("httpretty.core.old_socket")
+@patch("httpretty.core.socket")
def test_fakesock_socket_real_sendall_socket_error_when_http(socket, old_socket):
("fakesock.socket#real_sendall should continue if the socket error was EAGAIN")
socket.error = SocketErrorStub
# Background: the real socket will stop returning bytes after the
# first call
real_socket = old_socket.return_value
- real_socket.recv.side_effect = [SocketErrorStub(42), b'after error', ""]
+ real_socket.recv.side_effect = [SocketErrorStub(42), b"after error", ""]
# Given a fake socket
socket = fakesock.socket()
socket.is_http = True
# When I call real_sendall with data, some args and kwargs
- socket.real_sendall(b"SOMEDATA", b'some extra args...', foo=b'bar')
+ socket.real_sendall(b"SOMEDATA", b"some extra args...", foo=b"bar")
# Then it should have called sendall in the real socket
- real_socket.sendall.assert_called_once_with(b"SOMEDATA", b'some extra args...', foo=b'bar')
+ real_socket.sendall.assert_called_once_with(
+ b"SOMEDATA", b"some extra args...", foo=b"bar"
+ )
# And the socket was set to blocking
real_socket.setblocking.assert_called_once_with(1)
@@ -430,14 +460,16 @@ def test_fakesock_socket_real_sendall_socket_error_when_http(socket, old_socket)
real_socket.connect.called.should.be.true
-@patch('httpretty.core.old_socket')
-@patch('httpretty.core.POTENTIAL_HTTP_PORTS')
-def test_fakesock_socket_real_sendall_when_sending_data(POTENTIAL_HTTP_PORTS, old_socket):
+@patch("httpretty.core.old_socket")
+@patch("httpretty.core.POTENTIAL_HTTP_PORTS")
+def test_fakesock_socket_real_sendall_when_sending_data(
+ POTENTIAL_HTTP_PORTS, old_socket
+):
("fakesock.socket#real_sendall should connect before sending data")
# Background: the real socket will stop returning bytes after the
# first call
real_socket = old_socket.return_value
- real_socket.recv.side_effect = [b'response from foobar :)', b""]
+ real_socket.recv.side_effect = [b"response from foobar :)", b""]
# And the potential http port is 4000
POTENTIAL_HTTP_PORTS.__contains__.side_effect = lambda other: int(other) == 4000
@@ -447,107 +479,119 @@ def test_fakesock_socket_real_sendall_when_sending_data(POTENTIAL_HTTP_PORTS, ol
socket = fakesock.socket()
# When I call connect to a server in a port that is considered HTTP
- socket.connect(('foobar.com', 4000))
+ socket.connect(("foobar.com", 4000))
# And send some data
socket.real_sendall(b"SOMEDATA")
# Then connect should have been called
- real_socket.connect.assert_called_once_with(('foobar.com', 4000))
+ real_socket.connect.assert_called_once_with(("foobar.com", 4000))
# And the socket was set to blocking
real_socket.setblocking.assert_called_once_with(1)
# And recv was called with the bufsize
- real_socket.recv.assert_has_calls([
- call(socket._bufsize)
- ])
+ real_socket.recv.assert_has_calls([call(socket._bufsize)])
# And the buffer should contain the data from the server
socket.fd.read().should.equal(b"response from foobar :)")
-@patch('httpretty.core.old_socket')
-@patch('httpretty.core.httpretty')
-@patch('httpretty.core.POTENTIAL_HTTP_PORTS')
-def test_fakesock_socket_sendall_with_valid_requestline(POTENTIAL_HTTP_PORTS, httpretty, old_socket):
- ("fakesock.socket#sendall should create an entry if it's given a valid request line")
- matcher = Mock(name='matcher')
- info = Mock(name='info')
+@patch("httpretty.core.old_socket")
+@patch("httpretty.core.httpretty")
+@patch("httpretty.core.POTENTIAL_HTTP_PORTS")
+def test_fakesock_socket_sendall_with_valid_requestline(
+ POTENTIAL_HTTP_PORTS, httpretty, old_socket
+):
+ (
+ "fakesock.socket#sendall should create an entry if it's given a valid request line"
+ )
+ matcher = Mock(name="matcher")
+ info = Mock(name="info")
httpretty.match_uriinfo.return_value = (matcher, info)
- httpretty.register_uri(httpretty.GET, 'http://foo.com/foobar')
+ httpretty.register_uri(httpretty.GET, "http://foo.com/foobar")
# Background:
# using a subclass of socket that mocks out real_sendall
class MySocket(fakesock.socket):
def real_sendall(self, data, *args, **kw):
- raise AssertionError('should never call this...')
+ raise AssertionError("should never call this...")
# Given an instance of that socket
socket = MySocket()
# And that is is considered http
- socket.connect(('foo.com', 80))
+ socket.connect(("foo.com", 80))
# When I try to send data
socket.sendall(b"GET /foobar HTTP/1.1\r\nContent-Type: application/json\r\n\r\n")
-@patch('httpretty.core.old_socket')
-@patch('httpretty.core.httpretty')
-@patch('httpretty.core.POTENTIAL_HTTP_PORTS')
-def test_fakesock_socket_sendall_with_valid_requestline_2(POTENTIAL_HTTP_PORTS, httpretty, old_socket):
- ("fakesock.socket#sendall should create an entry if it's given a valid request line")
- matcher = Mock(name='matcher')
- info = Mock(name='info')
+@patch("httpretty.core.old_socket")
+@patch("httpretty.core.httpretty")
+@patch("httpretty.core.POTENTIAL_HTTP_PORTS")
+def test_fakesock_socket_sendall_with_valid_requestline_2(
+ POTENTIAL_HTTP_PORTS, httpretty, old_socket
+):
+ (
+ "fakesock.socket#sendall should create an entry if it's given a valid request line"
+ )
+ matcher = Mock(name="matcher")
+ info = Mock(name="info")
httpretty.match_uriinfo.return_value = (matcher, info)
- httpretty.register_uri(httpretty.GET, 'http://foo.com/foobar')
+ httpretty.register_uri(httpretty.GET, "http://foo.com/foobar")
# Background:
# using a subclass of socket that mocks out real_sendall
class MySocket(fakesock.socket):
def real_sendall(self, data, *args, **kw):
- raise AssertionError('should never call this...')
+ raise AssertionError("should never call this...")
# Given an instance of that socket
socket = MySocket()
# And that is is considered http
- socket.connect(('foo.com', 80))
+ socket.connect(("foo.com", 80))
# When I try to send data
socket.sendall(b"GET /foobar HTTP/1.1\r\nContent-Type: application/json\r\n\r\n")
-@patch('httpretty.core.old_socket')
-@patch('httpretty.core.POTENTIAL_HTTP_PORTS')
-def test_fakesock_socket_sendall_with_body_data_no_entry(POTENTIAL_HTTP_PORTS, old_socket):
- ("fakesock.socket#sendall should call real_sendall when not parsing headers and there is no entry")
+@patch("httpretty.core.old_socket")
+@patch("httpretty.core.POTENTIAL_HTTP_PORTS")
+def test_fakesock_socket_sendall_with_body_data_no_entry(
+ POTENTIAL_HTTP_PORTS, old_socket
+):
+ (
+ "fakesock.socket#sendall should call real_sendall when not parsing headers and there is no entry"
+ )
# Background:
# Using a subclass of socket that mocks out real_sendall
class MySocket(fakesock.socket):
def real_sendall(self, data):
- data.should.equal(b'BLABLABLABLA')
- return 'cool'
+ data.should.equal(b"BLABLABLABLA")
+ return "cool"
# Given an instance of that socket
socket = MySocket()
socket._entry = None
# And that is is considered http
- socket.connect(('foo.com', 80))
+ socket.connect(("foo.com", 80))
# When I try to send data
result = socket.sendall(b"BLABLABLABLA")
# Then the result should be the return value from real_sendall
- result.should.equal('cool')
+ result.should.equal("cool")
-@patch('httpretty.core.old_socket')
-@patch('httpretty.core.POTENTIAL_HTTP_PORTS')
-def test_fakesock_socket_sendall_with_body_data_with_entry(POTENTIAL_HTTP_PORTS, old_socket):
+@patch("httpretty.core.old_socket")
+@patch("httpretty.core.POTENTIAL_HTTP_PORTS")
+def test_fakesock_socket_sendall_with_body_data_with_entry(
+ POTENTIAL_HTTP_PORTS, old_socket
+):
("fakesock.socket#sendall should call real_sendall when there is no entry")
# Background:
# Using a subclass of socket that mocks out real_sendall
@@ -561,96 +605,110 @@ def test_fakesock_socket_sendall_with_body_data_with_entry(POTENTIAL_HTTP_PORTS,
socket = MySocket()
# And that is is considered http
- socket.connect(('foo.com', 80))
+ socket.connect(("foo.com", 80))
# When I try to send data
socket.sendall(b"BLABLABLABLA")
# Then it should have called real_sendall
- data_sent.should.equal([b'BLABLABLABLA'])
+ data_sent.should.equal([b"BLABLABLABLA"])
-@patch('httpretty.core.httpretty.match_uriinfo')
-@patch('httpretty.core.old_socket')
-@patch('httpretty.core.POTENTIAL_HTTP_PORTS')
-def test_fakesock_socket_sendall_with_body_data_with_chunked_entry(POTENTIAL_HTTP_PORTS, old_socket, match_uriinfo):
+@patch("httpretty.core.httpretty.match_uriinfo")
+@patch("httpretty.core.old_socket")
+@patch("httpretty.core.POTENTIAL_HTTP_PORTS")
+def test_fakesock_socket_sendall_with_body_data_with_chunked_entry(
+ POTENTIAL_HTTP_PORTS, old_socket, match_uriinfo
+):
("fakesock.socket#sendall should call real_sendall when not ")
# Background:
# Using a subclass of socket that mocks out real_sendall
class MySocket(fakesock.socket):
def real_sendall(self, data):
- raise AssertionError('should have never been called')
+ raise AssertionError("should have never been called")
- matcher = Mock(name='matcher')
- info = Mock(name='info')
+ matcher = Mock(name="matcher")
+ info = Mock(name="info")
httpretty.match_uriinfo.return_value = (matcher, info)
# Using a mocked entry
entry = Mock()
- entry.method = 'GET'
- entry.info.path = '/foo'
+ entry.method = "GET"
+ entry.info.path = "/foo"
- entry.request.headers = {
- 'transfer-encoding': 'chunked',
- }
- entry.request.body = b''
+ entry.request.headers = {"transfer-encoding": "chunked"}
+ entry.request.body = b""
# Given an instance of that socket
socket = MySocket()
socket._entry = entry
# And that is is considered http
- socket.connect(('foo.com', 80))
+ socket.connect(("foo.com", 80))
# When I try to send data
socket.sendall(b"BLABLABLABLA")
# Then the entry should have that body
- httpretty.last_request.body.should.equal(b'BLABLABLABLA')
+ httpretty.last_request.body.should.equal(b"BLABLABLABLA")
def test_URIMatcher_respects_querystring():
("URIMatcher response querystring")
- matcher = URIMatcher('http://www.foo.com/?query=true', None)
- info = URIInfo.from_uri('http://www.foo.com/', None)
+ matcher = URIMatcher("http://www.foo.com/?query=true", None)
+ info = URIInfo.from_uri("http://www.foo.com/", None)
assert matcher.matches(info)
- matcher = URIMatcher('http://www.foo.com/?query=true', None, match_querystring=True)
- info = URIInfo.from_uri('http://www.foo.com/', None)
+ matcher = URIMatcher("http://www.foo.com/?query=true", None, match_querystring=True)
+ info = URIInfo.from_uri("http://www.foo.com/", None)
assert not matcher.matches(info)
- matcher = URIMatcher('http://www.foo.com/?query=true', None, match_querystring=True)
- info = URIInfo.from_uri('http://www.foo.com/?query=true', None)
+ matcher = URIMatcher("http://www.foo.com/?query=true", None, match_querystring=True)
+ info = URIInfo.from_uri("http://www.foo.com/?query=true", None)
assert matcher.matches(info)
- matcher = URIMatcher('http://www.foo.com/?query=true&unquery=false', None, match_querystring=True)
- info = URIInfo.from_uri('http://www.foo.com/?unquery=false&query=true', None)
+ matcher = URIMatcher(
+ "http://www.foo.com/?query=true&unquery=false", None, match_querystring=True
+ )
+ info = URIInfo.from_uri("http://www.foo.com/?unquery=false&query=true", None)
assert matcher.matches(info)
- matcher = URIMatcher('http://www.foo.com/?unquery=false&query=true', None, match_querystring=True)
- info = URIInfo.from_uri('http://www.foo.com/?query=true&unquery=false', None)
+ matcher = URIMatcher(
+ "http://www.foo.com/?unquery=false&query=true", None, match_querystring=True
+ )
+ info = URIInfo.from_uri("http://www.foo.com/?query=true&unquery=false", None)
assert matcher.matches(info)
def test_URIMatcher_equality_respects_querystring():
("URIMatcher equality check should check querystring")
- matcher_a = URIMatcher('http://www.foo.com/?query=true', None)
- matcher_b = URIMatcher('http://www.foo.com/?query=false', None)
+ matcher_a = URIMatcher("http://www.foo.com/?query=true", None)
+ matcher_b = URIMatcher("http://www.foo.com/?query=false", None)
assert matcher_a == matcher_b
- matcher_a = URIMatcher('http://www.foo.com/?query=true', None)
- matcher_b = URIMatcher('http://www.foo.com/', None)
+ matcher_a = URIMatcher("http://www.foo.com/?query=true", None)
+ matcher_b = URIMatcher("http://www.foo.com/", None)
assert matcher_a == matcher_b
- matcher_a = URIMatcher('http://www.foo.com/?query=true', None, match_querystring=True)
- matcher_b = URIMatcher('http://www.foo.com/?query=false', None, match_querystring=True)
+ matcher_a = URIMatcher(
+ "http://www.foo.com/?query=true", None, match_querystring=True
+ )
+ matcher_b = URIMatcher(
+ "http://www.foo.com/?query=false", None, match_querystring=True
+ )
assert not matcher_a == matcher_b
- matcher_a = URIMatcher('http://www.foo.com/?query=true', None, match_querystring=True)
- matcher_b = URIMatcher('http://www.foo.com/', None, match_querystring=True)
+ matcher_a = URIMatcher(
+ "http://www.foo.com/?query=true", None, match_querystring=True
+ )
+ matcher_b = URIMatcher("http://www.foo.com/", None, match_querystring=True)
assert not matcher_a == matcher_b
- matcher_a = URIMatcher('http://www.foo.com/?query=true&unquery=false', None, match_querystring=True)
- matcher_b = URIMatcher('http://www.foo.com/?unquery=false&query=true', None, match_querystring=True)
+ matcher_a = URIMatcher(
+ "http://www.foo.com/?query=true&unquery=false", None, match_querystring=True
+ )
+ matcher_b = URIMatcher(
+ "http://www.foo.com/?unquery=false&query=true", None, match_querystring=True
+ )
assert matcher_a == matcher_b
diff --git a/tests/unit/test_httpretty.py b/tests/unit/test_httpretty.py
index 3ae4f4f..f072b31 100644
--- a/tests/unit/test_httpretty.py
+++ b/tests/unit/test_httpretty.py
@@ -50,38 +50,40 @@ Content-Type: %(content_type)s
def test_httpretty_should_raise_proper_exception_on_inconsistent_length():
- ("HTTPretty should raise proper exception on inconsistent Content-Length / "
- "registered response body")
+ (
+ "HTTPretty should raise proper exception on inconsistent Content-Length / "
+ "registered response body"
+ )
HTTPretty.register_uri.when.called_with(
HTTPretty.GET,
"http://github.com/gabrielfalcao",
body="that's me!",
- adding_headers={
- 'Content-Length': '999'
- }
+ adding_headers={"Content-Length": "999"},
).should.have.raised(
HTTPrettyError,
'HTTPretty got inconsistent parameters. The header Content-Length you registered expects size "999" '
- 'but the body you registered for that has actually length "10".'
+ 'but the body you registered for that has actually length "10".',
)
def test_httpretty_should_raise_on_socket_send_when_uri_registered():
- ("HTTPretty should raise a RuntimeError when the fakesocket is "
- "used in an invalid usage.")
+ (
+ "HTTPretty should raise a RuntimeError when the fakesocket is "
+ "used in an invalid usage."
+ )
import socket
+
HTTPretty.enable()
- HTTPretty.register_uri(HTTPretty.GET,
- 'http://127.0.0.1:5000')
+ HTTPretty.register_uri(HTTPretty.GET, "http://127.0.0.1:5000")
expect(core.POTENTIAL_HTTP_PORTS).to.be.equal(set([80, 5000]))
expect(core.POTENTIAL_HTTPS_PORTS).to.be.equal(set([443]))
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.connect(('127.0.0.1', 5000))
- expect(sock.send).when.called_with(b'whatever').to.throw(RuntimeError)
+ sock.connect(("127.0.0.1", 5000))
+ expect(sock.send).when.called_with(b"whatever").to.throw(RuntimeError)
sock.close()
# restore the previous value
@@ -91,17 +93,20 @@ def test_httpretty_should_raise_on_socket_send_when_uri_registered():
def test_httpretty_should_not_raise_on_socket_send_when_uri_not_registered():
- ("HTTPretty should not raise a RuntimeError when the fakesocket "
- "is used in an invalid usage.")
+ (
+ "HTTPretty should not raise a RuntimeError when the fakesocket "
+ "is used in an invalid usage."
+ )
import socket
+
HTTPretty.enable()
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
sock.setblocking(0)
- expect(sock.sendto).when.called_with(b'whatever',
- ('127.0.0.1', 53)
- ).should_not.throw(RuntimeError)
+ expect(sock.sendto).when.called_with(
+ b"whatever", ("127.0.0.1", 53)
+ ).should_not.throw(RuntimeError)
sock.close()
HTTPretty.reset()
@@ -109,7 +114,7 @@ def test_httpretty_should_not_raise_on_socket_send_when_uri_not_registered():
def test_does_not_have_last_request_by_default():
- 'HTTPretty.last_request is a dummy object by default'
+ "HTTPretty.last_request is a dummy object by default"
HTTPretty.reset()
expect(HTTPretty.last_request.headers).to.be.empty
@@ -119,93 +124,95 @@ def test_does_not_have_last_request_by_default():
def test_status_codes():
"HTTPretty supports N status codes"
- expect(STATUSES).to.equal({
- 100: "Continue",
- 101: "Switching Protocols",
- 102: "Processing",
- 200: "OK",
- 201: "Created",
- 202: "Accepted",
- 203: "Non-Authoritative Information",
- 204: "No Content",
- 205: "Reset Content",
- 206: "Partial Content",
- 207: "Multi-Status",
- 208: "Already Reported",
- 226: "IM Used",
- 300: "Multiple Choices",
- 301: "Moved Permanently",
- 302: "Found",
- 303: "See Other",
- 304: "Not Modified",
- 305: "Use Proxy",
- 306: "Switch Proxy",
- 307: "Temporary Redirect",
- 308: "Permanent Redirect",
- 400: "Bad Request",
- 401: "Unauthorized",
- 402: "Payment Required",
- 403: "Forbidden",
- 404: "Not Found",
- 405: "Method Not Allowed",
- 406: "Not Acceptable",
- 407: "Proxy Authentication Required",
- 408: "Request a Timeout",
- 409: "Conflict",
- 410: "Gone",
- 411: "Length Required",
- 412: "Precondition Failed",
- 413: "Request Entity Too Large",
- 414: "Request-URI Too Long",
- 415: "Unsupported Media Type",
- 416: "Requested Range Not Satisfiable",
- 417: "Expectation Failed",
- 418: "I'm a teapot",
- 420: "Enhance Your Calm",
- 422: "Unprocessable Entity",
- 423: "Locked",
- 424: "Failed Dependency",
- 425: "Unordered Collection",
- 426: "Upgrade Required",
- 428: "Precondition Required",
- 429: "Too Many Requests",
- 431: "Request Header Fields Too Large",
- 444: "No Response",
- 449: "Retry With",
- 450: "Blocked by Windows Parental Controls",
- 451: "Unavailable For Legal Reasons",
- 494: "Request Header Too Large",
- 495: "Cert Error",
- 496: "No Cert",
- 497: "HTTP to HTTPS",
- 499: "Client Closed Request",
- 500: "Internal Server Error",
- 501: "Not Implemented",
- 502: "Bad Gateway",
- 503: "Service Unavailable",
- 504: "Gateway Timeout",
- 505: "HTTP Version Not Supported",
- 506: "Variant Also Negotiates",
- 507: "Insufficient Storage",
- 508: "Loop Detected",
- 509: "Bandwidth Limit Exceeded",
- 510: "Not Extended",
- 511: "Network Authentication Required",
- 598: "Network read timeout error",
- 599: "Network connect timeout error",
- })
+ expect(STATUSES).to.equal(
+ {
+ 100: "Continue",
+ 101: "Switching Protocols",
+ 102: "Processing",
+ 200: "OK",
+ 201: "Created",
+ 202: "Accepted",
+ 203: "Non-Authoritative Information",
+ 204: "No Content",
+ 205: "Reset Content",
+ 206: "Partial Content",
+ 207: "Multi-Status",
+ 208: "Already Reported",
+ 226: "IM Used",
+ 300: "Multiple Choices",
+ 301: "Moved Permanently",
+ 302: "Found",
+ 303: "See Other",
+ 304: "Not Modified",
+ 305: "Use Proxy",
+ 306: "Switch Proxy",
+ 307: "Temporary Redirect",
+ 308: "Permanent Redirect",
+ 400: "Bad Request",
+ 401: "Unauthorized",
+ 402: "Payment Required",
+ 403: "Forbidden",
+ 404: "Not Found",
+ 405: "Method Not Allowed",
+ 406: "Not Acceptable",
+ 407: "Proxy Authentication Required",
+ 408: "Request a Timeout",
+ 409: "Conflict",
+ 410: "Gone",
+ 411: "Length Required",
+ 412: "Precondition Failed",
+ 413: "Request Entity Too Large",
+ 414: "Request-URI Too Long",
+ 415: "Unsupported Media Type",
+ 416: "Requested Range Not Satisfiable",
+ 417: "Expectation Failed",
+ 418: "I'm a teapot",
+ 420: "Enhance Your Calm",
+ 422: "Unprocessable Entity",
+ 423: "Locked",
+ 424: "Failed Dependency",
+ 425: "Unordered Collection",
+ 426: "Upgrade Required",
+ 428: "Precondition Required",
+ 429: "Too Many Requests",
+ 431: "Request Header Fields Too Large",
+ 444: "No Response",
+ 449: "Retry With",
+ 450: "Blocked by Windows Parental Controls",
+ 451: "Unavailable For Legal Reasons",
+ 494: "Request Header Too Large",
+ 495: "Cert Error",
+ 496: "No Cert",
+ 497: "HTTP to HTTPS",
+ 499: "Client Closed Request",
+ 500: "Internal Server Error",
+ 501: "Not Implemented",
+ 502: "Bad Gateway",
+ 503: "Service Unavailable",
+ 504: "Gateway Timeout",
+ 505: "HTTP Version Not Supported",
+ 506: "Variant Also Negotiates",
+ 507: "Insufficient Storage",
+ 508: "Loop Detected",
+ 509: "Bandwidth Limit Exceeded",
+ 510: "Not Extended",
+ 511: "Network Authentication Required",
+ 598: "Network read timeout error",
+ 599: "Network connect timeout error",
+ }
+ )
def test_uri_info_full_url():
uri_info = URIInfo(
- username='johhny',
- password='password',
- hostname=b'google.com',
+ username="johhny",
+ password="password",
+ hostname=b"google.com",
port=80,
- path=b'/',
- query=b'foo=bar&baz=test',
- fragment='',
- scheme='',
+ path=b"/",
+ query=b"foo=bar&baz=test",
+ fragment="",
+ scheme="",
)
expect(uri_info.full_url()).to.equal(
@@ -222,24 +229,24 @@ def test_uri_info_eq_ignores_case():
hostname matching.
"""
uri_info_uppercase = URIInfo(
- username='johhny',
- password='password',
- hostname=b'GOOGLE.COM',
+ username="johhny",
+ password="password",
+ hostname=b"GOOGLE.COM",
port=80,
- path=b'/',
- query=b'foo=bar&baz=test',
- fragment='',
- scheme='',
+ path=b"/",
+ query=b"foo=bar&baz=test",
+ fragment="",
+ scheme="",
)
uri_info_lowercase = URIInfo(
- username='johhny',
- password='password',
- hostname=b'google.com',
+ username="johhny",
+ password="password",
+ hostname=b"google.com",
port=80,
- path=b'/',
- query=b'foo=bar&baz=test',
- fragment='',
- scheme='',
+ path=b"/",
+ query=b"foo=bar&baz=test",
+ fragment="",
+ scheme="",
)
expect(uri_info_uppercase).to.equal(uri_info_lowercase)
@@ -256,43 +263,48 @@ def test_global_boolean_enabled():
def test_py3kobject_implements_valid__repr__based_on__str__():
class MyObject(BaseClass):
def __str__(self):
- return 'hi'
+ return "hi"
myobj = MyObject()
- expect(repr(myobj)).to.be.equal('hi')
+ expect(repr(myobj)).to.be.equal("hi")
def test_Entry_class_normalizes_headers():
- entry = Entry(HTTPretty.GET, 'http://example.com', 'example',
- host='example.com', cache_control='no-cache', x_forward_for='proxy')
+ entry = Entry(
+ HTTPretty.GET,
+ "http://example.com",
+ "example",
+ host="example.com",
+ cache_control="no-cache",
+ x_forward_for="proxy",
+ )
- entry.adding_headers.should.equal({
- 'Host': 'example.com',
- 'Cache-Control': 'no-cache',
- 'X-Forward-For': 'proxy'
- })
+ entry.adding_headers.should.equal(
+ {"Host": "example.com", "Cache-Control": "no-cache", "X-Forward-For": "proxy"}
+ )
def test_Entry_class_counts_multibyte_characters_in_bytes():
- entry = Entry(HTTPretty.GET, 'http://example.com', 'こんにちは')
+ entry = Entry(HTTPretty.GET, "http://example.com", "こんにちは")
buf = FakeSockFile()
entry.fill_filekind(buf)
response = buf.read()
- expect(b'content-length: 15\n').to.be.within(response)
+ expect(b"content-length: 15\n").to.be.within(response)
def test_Entry_class_counts_dynamic():
- result = (200, {}, 'こんにちは'.encode('utf-8'))
- entry = Entry(HTTPretty.GET, 'http://example.com', lambda *args: result)
- entry.info = URIInfo.from_uri('http://example.com', entry)
+ result = (200, {}, "こんにちは".encode("utf-8"))
+ entry = Entry(HTTPretty.GET, "http://example.com", lambda *args: result)
+ entry.info = URIInfo.from_uri("http://example.com", entry)
buf = FakeSockFile()
entry.fill_filekind(buf)
response = buf.getvalue()
- expect(b'content-length: 15\n').to.be.within(response)
+ expect(b"content-length: 15\n").to.be.within(response)
def test_fake_socket_passes_through_setblocking():
import socket
+
HTTPretty.enable()
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.truesock = MagicMock()
@@ -302,6 +314,7 @@ def test_fake_socket_passes_through_setblocking():
def test_fake_socket_passes_through_fileno():
import socket
+
with httpretty.enabled():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.truesock = MagicMock()
@@ -311,15 +324,19 @@ def test_fake_socket_passes_through_fileno():
def test_fake_socket_passes_through_getsockopt():
import socket
+
HTTPretty.enable()
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.truesock = MagicMock()
- expect(s.getsockopt).called_with(socket.SOL_SOCKET, 1).should_not.throw(AttributeError)
+ expect(s.getsockopt).called_with(socket.SOL_SOCKET, 1).should_not.throw(
+ AttributeError
+ )
s.truesock.getsockopt.assert_called_with(socket.SOL_SOCKET, 1)
def test_fake_socket_passes_through_bind():
import socket
+
HTTPretty.enable()
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.truesock = MagicMock()
@@ -329,6 +346,7 @@ def test_fake_socket_passes_through_bind():
def test_fake_socket_passes_through_connect_ex():
import socket
+
HTTPretty.enable()
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.truesock = MagicMock()
@@ -338,6 +356,7 @@ def test_fake_socket_passes_through_connect_ex():
def test_fake_socket_passes_through_listen():
import socket
+
HTTPretty.enable()
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.truesock = MagicMock()
@@ -347,6 +366,7 @@ def test_fake_socket_passes_through_listen():
def test_fake_socket_passes_through_getpeername():
import socket
+
HTTPretty.enable()
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.truesock = MagicMock()
@@ -356,6 +376,7 @@ def test_fake_socket_passes_through_getpeername():
def test_fake_socket_passes_through_getsockname():
import socket
+
HTTPretty.enable()
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.truesock = MagicMock()
@@ -365,6 +386,7 @@ def test_fake_socket_passes_through_getsockname():
def test_fake_socket_passes_through_gettimeout():
import socket
+
HTTPretty.enable()
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.truesock = MagicMock()
@@ -374,6 +396,7 @@ def test_fake_socket_passes_through_gettimeout():
def test_fake_socket_passes_through_shutdown():
import socket
+
HTTPretty.enable()
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.truesock = MagicMock()
@@ -383,11 +406,12 @@ def test_fake_socket_passes_through_shutdown():
def test_unix_socket():
import socket
+
HTTPretty.enable()
# Create a UDS socket
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- server_address = './not-exist-socket'
+ server_address = "./not-exist-socket"
try:
sock.connect(server_address)
except socket.error:
@@ -397,32 +421,32 @@ def test_unix_socket():
def test_HTTPrettyRequest_json_body():
""" A content-type of application/json should parse a valid json body """
- header = TEST_HEADER % {'content_type': 'application/json'}
- test_dict = {'hello': 'world'}
+ header = TEST_HEADER % {"content_type": "application/json"}
+ test_dict = {"hello": "world"}
request = HTTPrettyRequest(header, json.dumps(test_dict))
expect(request.parsed_body).to.equal(test_dict)
def test_HTTPrettyRequest_invalid_json_body():
""" A content-type of application/json with an invalid json body should return the content unaltered """
- header = TEST_HEADER % {'content_type': 'application/json'}
- invalid_json = u"{'hello', 'world','thisstringdoesntstops}"
+ header = TEST_HEADER % {"content_type": "application/json"}
+ invalid_json = "{'hello', 'world','thisstringdoesntstops}"
request = HTTPrettyRequest(header, invalid_json)
expect(request.parsed_body).to.equal(invalid_json)
def test_HTTPrettyRequest_queryparam():
""" A content-type of x-www-form-urlencoded with a valid queryparam body should return parsed content """
- header = TEST_HEADER % {'content_type': 'application/x-www-form-urlencoded'}
- valid_queryparam = u"hello=world&this=isavalidquerystring"
- valid_results = {'hello': ['world'], 'this': ['isavalidquerystring']}
+ header = TEST_HEADER % {"content_type": "application/x-www-form-urlencoded"}
+ valid_queryparam = "hello=world&this=isavalidquerystring"
+ valid_results = {"hello": ["world"], "this": ["isavalidquerystring"]}
request = HTTPrettyRequest(header, valid_queryparam)
expect(request.parsed_body).to.equal(valid_results)
def test_HTTPrettyRequest_arbitrarypost():
""" A non-handled content type request's post body should return the content unaltered """
- header = TEST_HEADER % {'content_type': 'thisis/notarealcontenttype'}
+ header = TEST_HEADER % {"content_type": "thisis/notarealcontenttype"}
gibberish_body = "1234567890!@#$%^&*()"
request = HTTPrettyRequest(header, gibberish_body)
expect(request.parsed_body).to.equal(gibberish_body)
@@ -435,8 +459,9 @@ def test_socktype_bad_python_version_regression():
https://bugs.python.org/issue20386
"""
import socket
+
someObject = object()
- with patch('socket.SocketType', someObject):
+ with patch("socket.SocketType", someObject):
HTTPretty.enable()
expect(socket.SocketType).to.equal(someObject)
HTTPretty.disable()
@@ -444,7 +469,8 @@ def test_socktype_bad_python_version_regression():
def test_socktype_good_python_version():
import socket
- with patch('socket.SocketType', socket.socket):
+
+ with patch("socket.SocketType", socket.socket):
HTTPretty.enable()
expect(socket.SocketType).to.equal(socket.socket)
HTTPretty.disable()
@@ -454,21 +480,17 @@ def test_httpretty_should_allow_registering_regex_hostnames():
"HTTPretty should allow registering regexes with requests"
HTTPretty.register_uri(
- HTTPretty.GET,
- re.compile(r'^http://\w+\.foo\.com/baz$'),
- body="yay",
+ HTTPretty.GET, re.compile(r"^http://\w+\.foo\.com/baz$"), body="yay"
)
- assert HTTPretty.match_http_address('www.foo.com', 80)
+ assert HTTPretty.match_http_address("www.foo.com", 80)
def test_httpretty_should_allow_registering_regex_hostnames_ssl():
"HTTPretty should allow registering regexes with requests (ssl version)"
HTTPretty.register_uri(
- HTTPretty.GET,
- re.compile(r'^https://\w+\.foo\.com/baz$'),
- body="yay",
+ HTTPretty.GET, re.compile(r"^https://\w+\.foo\.com/baz$"), body="yay"
)
- assert HTTPretty.match_https_hostname('www.foo.com')
+ assert HTTPretty.match_https_hostname("www.foo.com")
diff --git a/tests/unit/test_main.py b/tests/unit/test_main.py
index 6ce49d8..1c36f9e 100644
--- a/tests/unit/test_main.py
+++ b/tests/unit/test_main.py
@@ -6,7 +6,7 @@ import httpretty
from httpretty.core import HTTPrettyRequest
-@patch('httpretty.httpretty')
+@patch("httpretty.httpretty")
def test_last_request(original):
("httpretty.last_request() should return httpretty.core.last_request")
@@ -14,9 +14,11 @@ def test_last_request(original):
def test_has_request():
- ("httpretty.has_request() correctly detects "
- "whether or not a request has been made")
+ (
+ "httpretty.has_request() correctly detects "
+ "whether or not a request has been made"
+ )
httpretty.reset()
httpretty.has_request().should.be.false
- with patch('httpretty.httpretty.last_request', return_value=HTTPrettyRequest('')):
+ with patch("httpretty.httpretty.last_request", return_value=HTTPrettyRequest("")):
httpretty.has_request().should.be.true