diff options
| author | Gabriel Falcao <gabriel@nacaolivre.org> | 2013-10-14 01:10:53 -0400 |
|---|---|---|
| committer | Gabriel Falcao <gabriel@nacaolivre.org> | 2013-10-14 01:10:53 -0400 |
| commit | c88bbe9dea6170788a5f4db4bdb9d2fc119fcab0 (patch) | |
| tree | 2d1be892ada39a99c34d2b325f4c9e2e3a0867b2 /httpretty | |
| download | httpretty-gh-pages.tar.gz | |
documentationgh-pages
Diffstat (limited to 'httpretty')
| -rw-r--r-- | httpretty/__init__.py | 47 | ||||
| -rw-r--r-- | httpretty/compat.py | 90 | ||||
| -rw-r--r-- | httpretty/core.py | 1013 | ||||
| -rw-r--r-- | httpretty/errors.py | 31 | ||||
| -rw-r--r-- | httpretty/http.py | 154 | ||||
| -rw-r--r-- | httpretty/utils.py | 44 |
6 files changed, 1379 insertions, 0 deletions
diff --git a/httpretty/__init__.py b/httpretty/__init__.py new file mode 100644 index 0000000..1d7b8a6 --- /dev/null +++ b/httpretty/__init__.py @@ -0,0 +1,47 @@ +# #!/usr/bin/env python +# -*- coding: utf-8 -*- +# <HTTPretty - HTTP client mock for Python> +# Copyright (C) <2011-2013> Gabriel Falcão <gabriel@nacaolivre.org> +# +# Permission is hereby granted, free of charge, to any person +# obtaining a copy of this software and associated documentation +# files (the "Software"), to deal in the Software without +# restriction, including without limitation the rights to use, +# copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the +# Software is furnished to do so, subject to the following +# conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. +from __future__ import unicode_literals + +__version__ = version = '0.7.0' + +import sys + +from .core import httpretty, httprettified +from .errors import HTTPrettyError +from .core import URIInfo + +HTTPretty = httpretty +activate = httprettified + +SELF = sys.modules[__name__] + +for attr in [name.decode() for name in httpretty.METHODS] + ['register_uri', 'enable', 'disable', 'is_enabled', 'reset', 'Response']: + setattr(SELF, attr, getattr(httpretty, attr)) + + +def last_request(): + """returns the last request""" + return httpretty.last_request diff --git a/httpretty/compat.py b/httpretty/compat.py new file mode 100644 index 0000000..41c909c --- /dev/null +++ b/httpretty/compat.py @@ -0,0 +1,90 @@ +# #!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# <HTTPretty - HTTP client mock for Python> +# Copyright (C) <2011-2013> Gabriel Falcão <gabriel@nacaolivre.org> +# +# Permission is hereby granted, free of charge, to any person +# obtaining a copy of this software and associated documentation +# files (the "Software"), to deal in the Software without +# restriction, including without limitation the rights to use, +# copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the +# Software is furnished to do so, subject to the following +# conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. +from __future__ import unicode_literals + +import sys +import types + +PY3 = sys.version_info[0] == 3 +if PY3: # pragma: no cover + text_type = str + byte_type = bytes + import io + StringIO = io.BytesIO + basestring = (str, bytes) + + class BaseClass(object): + def __repr__(self): + return self.__str__() +else: # pragma: no cover + text_type = unicode + byte_type = str + import StringIO + StringIO = StringIO.StringIO + basestring = basestring + + +class BaseClass(object): + def __repr__(self): + ret = self.__str__() + if PY3: # pragma: no cover + return ret + else: + return ret.encode('utf-8') + + +try: # pragma: no cover + from urllib.parse import urlsplit, urlunsplit, parse_qs, quote, quote_plus, unquote +except ImportError: # pragma: no cover + from urlparse import urlsplit, urlunsplit, parse_qs, unquote + from urllib import quote, quote_plus + +try: # pragma: no cover + from http.server import BaseHTTPRequestHandler +except ImportError: # pragma: no cover + from BaseHTTPServer import BaseHTTPRequestHandler + + +ClassTypes = (type,) +if not PY3: # pragma: no cover + ClassTypes = (type, types.ClassType) + + +__all__ = [ + 'PY3', + 'StringIO', + 'text_type', + 'byte_type', + 'BaseClass', + 'BaseHTTPRequestHandler', + 'quote', + 'quote_plus', + 'urlunsplit', + 'urlsplit', + 'parse_qs', + 'ClassTypes', +] diff --git a/httpretty/core.py b/httpretty/core.py new file mode 100644 index 0000000..ddb1b1d --- /dev/null +++ b/httpretty/core.py @@ -0,0 +1,1013 @@ +# #!/usr/bin/env python +# -*- coding: utf-8 -*- +# <HTTPretty - HTTP client mock for Python> +# Copyright (C) <2011-2013> Gabriel Falcão <gabriel@nacaolivre.org> +# +# Permission is hereby granted, free of charge, to any person +# obtaining a copy of this software and associated documentation +# files (the "Software"), to deal in the Software without +# restriction, including without limitation the rights to use, +# copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the +# Software is furnished to do so, subject to the following +# conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. +from __future__ import unicode_literals + +import re +import codecs +import inspect +import socket +import functools +import itertools +import warnings +import logging +import traceback +import json +import contextlib + + +from .compat import ( + PY3, + StringIO, + text_type, + BaseClass, + BaseHTTPRequestHandler, + quote, + quote_plus, + urlunsplit, + urlsplit, + parse_qs, + unquote, + ClassTypes, + basestring +) +from .http import ( + STATUSES, + HttpBaseClass, + parse_requestline, + last_requestline, +) + +from .utils import ( + utf8, + decode_utf8, +) + +from .errors import HTTPrettyError + +from datetime import datetime +from datetime import timedelta +from errno import EAGAIN + +old_socket = socket.socket +old_create_connection = socket.create_connection +old_gethostbyname = socket.gethostbyname +old_gethostname = socket.gethostname +old_getaddrinfo = socket.getaddrinfo +old_socksocket = None +old_ssl_wrap_socket = None +old_sslwrap_simple = None +old_sslsocket = None + +if PY3: # pragma: no cover + basestring = (bytes, str) +try: # pragma: no cover + import socks + old_socksocket = socks.socksocket +except ImportError: + socks = None + +try: # pragma: no cover + import ssl + old_ssl_wrap_socket = ssl.wrap_socket + if not PY3: + old_sslwrap_simple = ssl.sslwrap_simple + old_sslsocket = ssl.SSLSocket +except ImportError: # pragma: no cover + ssl = None + + +POTENTIAL_HTTP_PORTS = set([80, 443]) +DEFAULT_HTTP_PORTS = tuple(POTENTIAL_HTTP_PORTS) + + +class HTTPrettyRequest(BaseHTTPRequestHandler, BaseClass): + """Represents a HTTP request. It takes a valid multi-line, `\r\n` + separated string with HTTP headers and parse them out using the + internal `parse_request` method. + + It also replaces the `rfile` and `wfile` attributes with StringIO + instances so that we garantee that it won't make any I/O, neighter + for writing nor reading. + + It has some convenience attributes: + + `headers` -> a mimetype object that can be cast into a dictionary, + contains all the request headers + + `method` -> the HTTP method used in this request + + `querystring` -> a dictionary containing lists with the + attributes. Please notice that if you need a single value from a + query string you will need to get it manually like: + + ```python + >>> request.querystring + {'name': ['Gabriel Falcao']} + >>> print request.querystring['name'][0] + ``` + + `parsed_body` -> a dictionary containing parsed request body or + None if HTTPrettyRequest doesn't know how to parse it. It + currently supports parsing body data that was sent under the + `content-type` headers values: 'application/json' or + 'application/x-www-form-urlencoded' + """ + def __init__(self, headers, body=''): + # first of all, lets make sure that if headers or body are + # unicode strings, it must be converted into a utf-8 encoded + # byte string + self.raw_headers = utf8(headers.strip()) + self.body = utf8(body) + + # Now let's concatenate the headers with the body, and create + # `rfile` based on it + self.rfile = StringIO(b'\r\n\r\n'.join([self.raw_headers, self.body])) + self.wfile = StringIO() # Creating `wfile` as an empty + # StringIO, just to avoid any real + # I/O calls + + # parsing the request line preemptively + self.raw_requestline = self.rfile.readline() + + # initiating the error attributes with None + self.error_code = None + self.error_message = None + + # Parse the request based on the attributes above + self.parse_request() + + # making the HTTP method string available as the command + self.method = self.command + + # Now 2 convenient attributes for the HTTPretty API: + + # `querystring` holds a dictionary with the parsed query string + self.path = decode_utf8(self.path) + + qstring = self.path.split("?", 1)[-1] + self.querystring = self.parse_querystring(qstring) + + # And the body will be attempted to be parsed as + # `application/json` or `application/x-www-form-urlencoded` + self.parsed_body = self.parse_request_body(self.body) + + def __str__(self): + return '<HTTPrettyRequest("{0}", total_headers={1}, body_length={2})>'.format( + self.headers.get('content-type', ''), + len(self.headers), + len(self.body), + ) + + def parse_querystring(self, qs): + expanded = decode_utf8(unquote(utf8(qs))) + + parsed = parse_qs(expanded) + result = {} + for k, v in parsed.iteritems(): + result[k] = map(decode_utf8, v) + + return result + + def parse_request_body(self, body): + """ Attempt to parse the post based on the content-type passed. Return the regular body if not """ + + PARSING_FUNCTIONS = { + 'application/json': json.loads, + 'text/json': json.loads, + 'application/x-www-form-urlencoded': self.parse_querystring, + } + FALLBACK_FUNCTION = lambda x: x + + content_type = self.headers.get('content-type', '') + + do_parse = PARSING_FUNCTIONS.get(content_type, FALLBACK_FUNCTION) + try: + return do_parse(body) + except: + return body + + +class EmptyRequestHeaders(dict): + pass + + +class HTTPrettyRequestEmpty(object): + body = '' + headers = EmptyRequestHeaders() + + +class FakeSockFile(StringIO): + pass + + +class FakeSSLSocket(object): + def __init__(self, sock, *args, **kw): + self._httpretty_sock = sock + + def __getattr__(self, attr): + return getattr(self._httpretty_sock, attr) + + +class fakesock(object): + class socket(object): + _entry = None + debuglevel = 0 + _sent_data = [] + + def __init__(self, family=socket.AF_INET, type=socket.SOCK_STREAM, + protocol=0): + self.setsockopt(family, type, protocol) + self.truesock = old_socket(family, type, protocol) + self._closed = True + self.fd = FakeSockFile() + self.timeout = socket._GLOBAL_DEFAULT_TIMEOUT + self._sock = self + self.is_http = False + self._bufsize = 16 + + def getpeercert(self, *a, **kw): + now = datetime.now() + shift = now + timedelta(days=30 * 12) + return { + 'notAfter': shift.strftime('%b %d %H:%M:%S GMT'), + 'subjectAltName': ( + ('DNS', '*%s' % self._host), + ('DNS', self._host), + ('DNS', '*'), + ), + 'subject': ( + ( + ('organizationName', '*.%s' % self._host), + ), + ( + ('organizationalUnitName', + 'Domain Control Validated'), + ), + ( + ('commonName', '*.%s' % self._host), + ), + ), + } + + def ssl(self, sock, *args, **kw): + return sock + + def setsockopt(self, family, type, protocol): + self.family = family + self.protocol = protocol + self.type = type + + def connect(self, address): + self._address = (self._host, self._port) = address + self._closed = False + self.is_http = self._port in POTENTIAL_HTTP_PORTS + + if not self.is_http: + self.truesock.connect(self._address) + + def close(self): + if not (self.is_http and self._closed): + self.truesock.close() + self._closed = True + + def makefile(self, mode='r', bufsize=-1): + """Returns this fake socket's own StringIO buffer. + + If there is an entry associated with the socket, the file + descriptor gets filled in with the entry data before being + returned. + """ + self._mode = mode + self._bufsize = bufsize + + if self._entry: + self._entry.fill_filekind(self.fd) + + return self.fd + + def real_sendall(self, data, *args, **kw): + """Sends data to the remote server. This method is called + when HTTPretty identifies that someone is trying to send + non-http data. + + The received bytes are written in this socket's StringIO + buffer so that HTTPretty can return it accordingly when + necessary. + """ + if self.is_http: # no need to connect if `self.is_http` is + # False because self.connect already did + # that + self.truesock.connect(self._address) + + self.truesock.settimeout(0) + self.truesock.sendall(data, *args, **kw) + + should_continue = True + while should_continue: + try: + received = self.truesock.recv(self._bufsize) + self.fd.write(received) + should_continue = len(received) > 0 + + except socket.error as e: + if e.errno == EAGAIN: + continue + break + + self.fd.seek(0) + + def sendall(self, data, *args, **kw): + self._sent_data.append(data) + + try: + requestline, _ = data.split(b'\r\n', 1) + method, path, version = parse_requestline(requestline) + is_parsing_headers = True + except ValueError: + is_parsing_headers = False + + if not self._entry: + # If the previous request wasn't mocked, don't mock the subsequent sending of data + return self.real_sendall(data, *args, **kw) + + self.fd.seek(0) + + if not is_parsing_headers: + if len(self._sent_data) > 1: + headers = utf8(last_requestline(self._sent_data)) + meta = dict(self._entry.request.headers) + body = utf8(self._sent_data[-1]) + if meta.get('transfer-encoding', '') == 'chunked': + if not body.isdigit() and body != '\r\n' and body != '0\r\n\r\n': + self._entry.request.body += body + else: + self._entry.request.body += body + + httpretty.historify_request(headers, body, False) + return + + # path might come with + s = urlsplit(path) + POTENTIAL_HTTP_PORTS.add(int(s.port or 80)) + headers, body = map(utf8, data.split(b'\r\n\r\n', 1)) + + request = httpretty.historify_request(headers, body) + + info = URIInfo(hostname=self._host, port=self._port, + path=s.path, + query=s.query, + last_request=request) + + matcher, entries = httpretty.match_uriinfo(info) + + if not entries: + self._entry = None + self.real_sendall(data) + return + + self._entry = matcher.get_next_entry(method, info, request) + + def debug(self, func, *a, **kw): + if self.is_http: + frame = inspect.stack()[0][0] + lines = map(utf8, traceback.format_stack(frame)) + + message = [ + "HTTPretty intercepted and unexpected socket method call.", + ("Please open an issue at " + "'https://github.com/gabrielfalcao/HTTPretty/issues'"), + "And paste the following traceback:\n", + "".join(decode_utf8(lines)), + ] + raise RuntimeError("\n".join(message)) + return func(*a, **kw) + + def settimeout(self, new_timeout): + self.timeout = new_timeout + + def send(self, *args, **kwargs): + return self.debug(self.truesock.send, *args, **kwargs) + + def sendto(self, *args, **kwargs): + return self.debug(self.truesock.sendto, *args, **kwargs) + + def recvfrom_into(self, *args, **kwargs): + return self.debug(self.truesock.recvfrom_into, *args, **kwargs) + + def recv_into(self, *args, **kwargs): + return self.debug(self.truesock.recv_into, *args, **kwargs) + + def recvfrom(self, *args, **kwargs): + return self.debug(self.truesock.recvfrom, *args, **kwargs) + + def recv(self, *args, **kwargs): + return self.debug(self.truesock.recv, *args, **kwargs) + + def __getattr__(self, name): + return getattr(self.truesock, name) + + +def fake_wrap_socket(s, *args, **kw): + return s + + +def create_fake_connection(address, timeout=socket._GLOBAL_DEFAULT_TIMEOUT, source_address=None): + s = fakesock.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP) + if timeout is not socket._GLOBAL_DEFAULT_TIMEOUT: + s.settimeout(timeout) + if source_address: + s.bind(source_address) + s.connect(address) + return s + + +def fake_gethostbyname(host): + return '127.0.0.1' + + +def fake_gethostname(): + return 'localhost' + + +def fake_getaddrinfo( + host, port, family=None, socktype=None, proto=None, flags=None): + return [(2, 1, 6, '', (host, port))] + + +class Entry(BaseClass): + def __init__(self, method, uri, body, + adding_headers=None, + forcing_headers=None, + status=200, + streaming=False, + **headers): + + self.method = method + self.uri = uri + self.info = None + self.request = None + + self.body_is_callable = False + if hasattr(body, "__call__"): + self.callable_body = body + self.body = None + self.body_is_callable = True + elif isinstance(body, text_type): + self.body = utf8(body) + else: + self.body = body + + self.streaming = streaming + if not streaming and not self.body_is_callable: + self.body_length = len(self.body or '') + else: + self.body_length = 0 + + self.adding_headers = adding_headers or {} + self.forcing_headers = forcing_headers or {} + self.status = int(status) + + for k, v in headers.items(): + name = "-".join(k.split("_")).title() + self.adding_headers[name] = v + + self.validate() + + def validate(self): + content_length_keys = 'Content-Length', 'content-length' + for key in content_length_keys: + got = self.adding_headers.get( + key, self.forcing_headers.get(key, None)) + + if got is None: + continue + + try: + igot = int(got) + except ValueError: + warnings.warn( + 'HTTPretty got to register the Content-Length header ' \ + 'with "%r" which is not a number' % got, + ) + + if igot > self.body_length: + raise HTTPrettyError( + 'HTTPretty got inconsistent parameters. The header ' \ + 'Content-Length you registered expects size "%d" but ' \ + 'the body you registered for that has actually length ' \ + '"%d".' % ( + igot, self.body_length, + ) + ) + + def __str__(self): + return r'<Entry %s %s getting %d>' % ( + self.method, self.uri, self.status) + + def normalize_headers(self, headers): + new = {} + for k in headers: + new_k = '-'.join([s.lower() for s in k.split('-')]) + new[new_k] = headers[k] + + return new + + def fill_filekind(self, fk): + now = datetime.utcnow() + + headers = { + 'status': self.status, + 'date': now.strftime('%a, %d %b %Y %H:%M:%S GMT'), + 'server': 'Python/HTTPretty', + 'connection': 'close', + } + + if self.forcing_headers: + headers = self.forcing_headers + + if self.adding_headers: + headers.update(self.normalize_headers(self.adding_headers)) + + headers = self.normalize_headers(headers) + status = headers.get('status', self.status) + if self.body_is_callable: + status, headers, self.body = self.callable_body(self.request, self.info.full_url(), headers) + headers.update({ + 'content-length': len(self.body) + }) + + string_list = [ + 'HTTP/1.1 %d %s' % (status, STATUSES[status]), + ] + + if 'date' in headers: + string_list.append('date: %s' % headers.pop('date')) + + if not self.forcing_headers: + content_type = headers.pop('content-type', + 'text/plain; charset=utf-8') + + content_length = headers.pop('content-length', self.body_length) + + string_list.append('content-type: %s' % content_type) + if not self.streaming: + string_list.append('content-length: %s' % content_length) + + string_list.append('server: %s' % headers.pop('server')) + + for k, v in headers.items(): + string_list.append( + '{0}: {1}'.format(k, v), + ) + + for item in string_list: + fk.write(utf8(item) + b'\n') + + fk.write(b'\r\n') + + if self.streaming: + self.body, body = itertools.tee(self.body) + for chunk in body: + fk.write(utf8(chunk)) + else: + fk.write(utf8(self.body)) + + fk.seek(0) + + +def url_fix(s, charset='utf-8'): + scheme, netloc, path, querystring, fragment = urlsplit(s) + path = quote(path, b'/%') + querystring = quote_plus(querystring, b':&=') + return urlunsplit((scheme, netloc, path, querystring, fragment)) + + +class URIInfo(BaseClass): + def __init__(self, + username='', + password='', + hostname='', + port=80, + path='/', + query='', + fragment='', + scheme='', + last_request=None): + + self.username = username or '' + self.password = password or '' + self.hostname = hostname or '' + + if port: + port = int(port) + + elif scheme == 'https': + port = 443 + + self.port = port or 80 + self.path = path or '' + self.query = query or '' + self.scheme = scheme or (self.port == 443 and "https" or "http") + self.fragment = fragment or '' + self.last_request = last_request + + def __str__(self): + attrs = ( + 'username', + 'password', + 'hostname', + 'port', + 'path', + ) + fmt = ", ".join(['%s="%s"' % (k, getattr(self, k, '')) for k in attrs]) + return r'<httpretty.URIInfo(%s)>' % fmt + + def __hash__(self): + return hash(text_type(self)) + + def __eq__(self, other): + self_tuple = ( + self.port, + decode_utf8(self.hostname.lower()), + url_fix(decode_utf8(self.path)), + ) + other_tuple = ( + other.port, + decode_utf8(other.hostname.lower()), + url_fix(decode_utf8(other.path)), + ) + return self_tuple == other_tuple + + def full_url(self, use_querystring=True): + credentials = "" + if self.password: + credentials = "{0}:{1}@".format( + self.username, self.password) + + query = "" + if use_querystring and self.query: + query = "?{0}".format(decode_utf8(self.query)) + + result = "{scheme}://{credentials}{domain}{path}{query}".format( + scheme=self.scheme, + credentials=credentials, + domain=self.get_full_domain(), + path=decode_utf8(self.path), + query=query + ) + return result + + def get_full_domain(self): + hostname = decode_utf8(self.hostname) + if self.port not in DEFAULT_HTTP_PORTS: + return ":".join([hostname, str(self.port)]) + + return hostname + + @classmethod + def from_uri(cls, uri, entry): + result = urlsplit(uri) + POTENTIAL_HTTP_PORTS.add(int(result.port or 80)) + return cls(result.username, + result.password, + result.hostname, + result.port, + result.path, + result.query, + result.fragment, + result.scheme, + entry) + + +class URIMatcher(object): + regex = None + info = None + + def __init__(self, uri, entries, match_querystring=False): + self._match_querystring = match_querystring + if type(uri).__name__ == 'SRE_Pattern': + self.regex = uri + else: + self.info = URIInfo.from_uri(uri, entries) + + self.entries = entries + + #hash of current_entry pointers, per method. + self.current_entries = {} + + def matches(self, info): + if self.info: + return self.info == info + else: + return self.regex.search(info.full_url( + use_querystring=self._match_querystring)) + + def __str__(self): + wrap = 'URLMatcher({0})' + if self.info: + return wrap.format(text_type(self.info)) + else: + return wrap.format(self.regex.pattern) + + def get_next_entry(self, method, info, request): + """Cycle through available responses, but only once. + Any subsequent requests will receive the last response""" + + if method not in self.current_entries: + self.current_entries[method] = 0 + + #restrict selection to entries that match the requested method + entries_for_method = [e for e in self.entries if e.method == method] + + if self.current_entries[method] >= len(entries_for_method): + self.current_entries[method] = -1 + + if not self.entries or not entries_for_method: + raise ValueError('I have no entries for method %s: %s' + % (method, self)) + + entry = entries_for_method[self.current_entries[method]] + if self.current_entries[method] != -1: + self.current_entries[method] += 1 + + # Attach more info to the entry + # So the callback can be more clever about what to do + # This does also fix the case where the callback + # would be handed a compiled regex as uri instead of the + # real uri + entry.info = info + entry.request = request + return entry + + def __hash__(self): + return hash(text_type(self)) + + def __eq__(self, other): + return text_type(self) == text_type(other) + + +class httpretty(HttpBaseClass): + """The URI registration class""" + _entries = {} + latest_requests = [] + + last_request = HTTPrettyRequestEmpty() + _is_enabled = False + + @classmethod + def match_uriinfo(cls, info): + for matcher, value in cls._entries.items(): + if matcher.matches(info): + return (matcher, info) + + return (None, []) + + @classmethod + @contextlib.contextmanager + def record(cls, filename, indentation=4, encoding='utf-8'): + try: + import urllib3 + except ImportError: + raise RuntimeError('HTTPretty requires urllib3 installed for recording actual requests.') + + + http = urllib3.PoolManager() + + cls.enable() + calls = [] + def record_request(request, uri, headers): + cls.disable() + + response = http.request(request.method, uri) + calls.append({ + 'request': { + 'uri': uri, + 'method': request.method, + 'headers': dict(request.headers), + 'body': request.body, + 'querystring': request.querystring + }, + 'response': { + 'status': response.status, + 'body': response.data, + 'headers': dict(response.headers) + } + }) + cls.enable() + return response.status, response.headers, response.data + + for method in cls.METHODS: + cls.register_uri(method, re.compile(r'.*', re.M), body=record_request) + + yield + cls.disable() + with codecs.open(filename, 'w', encoding) as f: + f.write(json.dumps(calls, indent=indentation)) + + @classmethod + @contextlib.contextmanager + def playback(cls, origin): + cls.enable() + + data = json.loads(open(origin).read()) + for item in data: + uri = item['request']['uri'] + method = item['request']['method'] + cls.register_uri(method, uri, body=item['response']['body'], forcing_headers=item['response']['headers']) + + yield + cls.disable() + + @classmethod + def reset(cls): + cls._entries.clear() + cls.latest_requests = [] + cls.last_request = HTTPrettyRequestEmpty() + + @classmethod + def historify_request(cls, headers, body='', append=True): + request = HTTPrettyRequest(headers, body) + cls.last_request = request + if append or not cls.latest_requests: + cls.latest_requests.append(request) + else: + cls.latest_requests[-1] = request + return request + + @classmethod + def register_uri(cls, method, uri, body='HTTPretty :)', + adding_headers=None, + forcing_headers=None, + status=200, + responses=None, match_querystring=False, + **headers): + + uri_is_string = isinstance(uri, basestring) + + if uri_is_string and re.search(r'^\w+://[^/]+[.]\w{2,}$', uri): + uri += '/' + + if isinstance(responses, list) and len(responses) > 0: + for response in responses: + response.uri = uri + response.method = method + entries_for_this_uri = responses + else: + headers[str('body')] = body + headers[str('adding_headers')] = adding_headers + headers[str('forcing_headers')] = forcing_headers + headers[str('status')] = status + + entries_for_this_uri = [ + cls.Response(method=method, uri=uri, **headers), + ] + + matcher = URIMatcher(uri, entries_for_this_uri, + match_querystring) + if matcher in cls._entries: + matcher.entries.extend(cls._entries[matcher]) + del cls._entries[matcher] + + cls._entries[matcher] = entries_for_this_uri + + def __str__(self): + return '<HTTPretty with %d URI entries>' % len(self._entries) + + @classmethod + def Response(cls, body, method=None, uri=None, adding_headers=None, forcing_headers=None, + status=200, streaming=False, **headers): + + headers[str('body')] = body + headers[str('adding_headers')] = adding_headers + headers[str('forcing_headers')] = forcing_headers + headers[str('status')] = int(status) + headers[str('streaming')] = streaming + return Entry(method, uri, **headers) + + @classmethod + def disable(cls): + cls._is_enabled = False + socket.socket = old_socket + socket.SocketType = old_socket + socket._socketobject = old_socket + + socket.create_connection = old_create_connection + socket.gethostname = old_gethostname + socket.gethostbyname = old_gethostbyname + socket.getaddrinfo = old_getaddrinfo + + socket.__dict__['socket'] = old_socket + socket.__dict__['_socketobject'] = old_socket + socket.__dict__['SocketType'] = old_socket + + socket.__dict__['create_connection'] = old_create_connection + socket.__dict__['gethostname'] = old_gethostname + socket.__dict__['gethostbyname'] = old_gethostbyname + socket.__dict__['getaddrinfo'] = old_getaddrinfo + + if socks: + socks.socksocket = old_socksocket + socks.__dict__['socksocket'] = old_socksocket + + if ssl: + ssl.wrap_socket = old_ssl_wrap_socket + ssl.SSLSocket = old_sslsocket + ssl.__dict__['wrap_socket'] = old_ssl_wrap_socket + ssl.__dict__['SSLSocket'] = old_sslsocket + + if not PY3: + ssl.sslwrap_simple = old_sslwrap_simple + ssl.__dict__['sslwrap_simple'] = old_sslwrap_simple + + @classmethod + def is_enabled(cls): + return cls._is_enabled + + @classmethod + def enable(cls): + cls._is_enabled = True + socket.socket = fakesock.socket + socket._socketobject = fakesock.socket + socket.SocketType = fakesock.socket + + socket.create_connection = create_fake_connection + socket.gethostname = fake_gethostname + socket.gethostbyname = fake_gethostbyname + socket.getaddrinfo = fake_getaddrinfo + + socket.__dict__['socket'] = fakesock.socket + socket.__dict__['_socketobject'] = fakesock.socket + socket.__dict__['SocketType'] = fakesock.socket + + socket.__dict__['create_connection'] = create_fake_connection + socket.__dict__['gethostname'] = fake_gethostname + socket.__dict__['gethostbyname'] = fake_gethostbyname + socket.__dict__['getaddrinfo'] = fake_getaddrinfo + + if socks: + socks.socksocket = fakesock.socket + socks.__dict__['socksocket'] = fakesock.socket + + if ssl: + ssl.wrap_socket = fake_wrap_socket + ssl.SSLSocket = FakeSSLSocket + + ssl.__dict__['wrap_socket'] = fake_wrap_socket + ssl.__dict__['SSLSocket'] = FakeSSLSocket + + if not PY3: + ssl.sslwrap_simple = fake_wrap_socket + ssl.__dict__['sslwrap_simple'] = fake_wrap_socket + + +def httprettified(test): + "A decorator tests that use HTTPretty" + def decorate_class(klass): + for attr in dir(klass): + if not attr.startswith('test_'): + continue + + attr_value = getattr(klass, attr) + if not hasattr(attr_value, "__call__"): + continue + + setattr(klass, attr, decorate_callable(attr_value)) + return klass + + def decorate_callable(test): + @functools.wraps(test) + def wrapper(*args, **kw): + httpretty.reset() + httpretty.enable() + try: + return test(*args, **kw) + finally: + httpretty.disable() + return wrapper + + if isinstance(test, ClassTypes): + return decorate_class(test) + return decorate_callable(test) diff --git a/httpretty/errors.py b/httpretty/errors.py new file mode 100644 index 0000000..946ba9b --- /dev/null +++ b/httpretty/errors.py @@ -0,0 +1,31 @@ +# #!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# <HTTPretty - HTTP client mock for Python> +# Copyright (C) <2011-2013> Gabriel Falcão <gabriel@nacaolivre.org> +# +# Permission is hereby granted, free of charge, to any person +# obtaining a copy of this software and associated documentation +# files (the "Software"), to deal in the Software without +# restriction, including without limitation the rights to use, +# copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the +# Software is furnished to do so, subject to the following +# conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. +from __future__ import unicode_literals + + +class HTTPrettyError(Exception): + pass diff --git a/httpretty/http.py b/httpretty/http.py new file mode 100644 index 0000000..388a73a --- /dev/null +++ b/httpretty/http.py @@ -0,0 +1,154 @@ +# #!/usr/bin/env python +# -*- coding: utf-8 -*- +# <HTTPretty - HTTP client mock for Python> +# Copyright (C) <2011-2013> Gabriel Falcão <gabriel@nacaolivre.org> +# +# Permission is hereby granted, free of charge, to any person +# obtaining a copy of this software and associated documentation +# files (the "Software"), to deal in the Software without +# restriction, including without limitation the rights to use, +# copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the +# Software is furnished to do so, subject to the following +# conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. +from __future__ import unicode_literals + +import re +from .compat import BaseClass + + +STATUSES = { + 100: "Continue", + 101: "Switching Protocols", + 102: "Processing", + 200: "OK", + 201: "Created", + 202: "Accepted", + 203: "Non-Authoritative Information", + 204: "No Content", + 205: "Reset Content", + 206: "Partial Content", + 207: "Multi-Status", + 208: "Already Reported", + 226: "IM Used", + 300: "Multiple Choices", + 301: "Moved Permanently", + 302: "Found", + 303: "See Other", + 304: "Not Modified", + 305: "Use Proxy", + 306: "Switch Proxy", + 307: "Temporary Redirect", + 308: "Permanent Redirect", + 400: "Bad Request", + 401: "Unauthorized", + 402: "Payment Required", + 403: "Forbidden", + 404: "Not Found", + 405: "Method Not Allowed", + 406: "Not Acceptable", + 407: "Proxy Authentication Required", + 408: "Request a Timeout", + 409: "Conflict", + 410: "Gone", + 411: "Length Required", + 412: "Precondition Failed", + 413: "Request Entity Too Large", + 414: "Request-URI Too Long", + 415: "Unsupported Media Type", + 416: "Requested Range Not Satisfiable", + 417: "Expectation Failed", + 418: "I'm a teapot", + 420: "Enhance Your Calm", + 422: "Unprocessable Entity", + 423: "Locked", + 424: "Failed Dependency", + 424: "Method Failure", + 425: "Unordered Collection", + 426: "Upgrade Required", + 428: "Precondition Required", + 429: "Too Many Requests", + 431: "Request Header Fields Too Large", + 444: "No Response", + 449: "Retry With", + 450: "Blocked by Windows Parental Controls", + 451: "Unavailable For Legal Reasons", + 451: "Redirect", + 494: "Request Header Too Large", + 495: "Cert Error", + 496: "No Cert", + 497: "HTTP to HTTPS", + 499: "Client Closed Request", + 500: "Internal Server Error", + 501: "Not Implemented", + 502: "Bad Gateway", + 503: "Service Unavailable", + 504: "Gateway Timeout", + 505: "HTTP Version Not Supported", + 506: "Variant Also Negotiates", + 507: "Insufficient Storage", + 508: "Loop Detected", + 509: "Bandwidth Limit Exceeded", + 510: "Not Extended", + 511: "Network Authentication Required", + 598: "Network read timeout error", + 599: "Network connect timeout error", +} + + +class HttpBaseClass(BaseClass): + GET = b'GET' + PUT = b'PUT' + POST = b'POST' + DELETE = b'DELETE' + HEAD = b'HEAD' + PATCH = b'PATCH' + OPTIONS = b'OPTIONS' + CONNECT = b'CONNECT' + METHODS = (GET, PUT, POST, DELETE, HEAD, PATCH, OPTIONS, CONNECT) + + +def parse_requestline(s): + """ + http://www.w3.org/Protocols/rfc2616/rfc2616-sec5.html#sec5 + + >>> parse_requestline('GET / HTTP/1.0') + ('GET', '/', '1.0') + >>> parse_requestline('post /testurl htTP/1.1') + ('POST', '/testurl', '1.1') + >>> parse_requestline('Im not a RequestLine') + Traceback (most recent call last): + ... + ValueError: Not a Request-Line + """ + methods = b'|'.join(HttpBaseClass.METHODS) + m = re.match(br'(' + methods + b')\s+(.*)\s+HTTP/(1.[0|1])', s, re.I) + if m: + return m.group(1).upper(), m.group(2), m.group(3) + else: + raise ValueError('Not a Request-Line') + + +def last_requestline(sent_data): + """ + Find the last line in sent_data that can be parsed with parse_requestline + """ + for line in reversed(sent_data): + try: + parse_requestline(line) + except ValueError: + pass + else: + return line diff --git a/httpretty/utils.py b/httpretty/utils.py new file mode 100644 index 0000000..0480f43 --- /dev/null +++ b/httpretty/utils.py @@ -0,0 +1,44 @@ +# #!/usr/bin/env python +# -*- coding: utf-8 -*- +# <HTTPretty - HTTP client mock for Python> +# Copyright (C) <2011-2013> Gabriel Falcão <gabriel@nacaolivre.org> +# +# Permission is hereby granted, free of charge, to any person +# obtaining a copy of this software and associated documentation +# files (the "Software"), to deal in the Software without +# restriction, including without limitation the rights to use, +# copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the +# Software is furnished to do so, subject to the following +# conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. +from __future__ import unicode_literals + +from .compat import ( + byte_type, text_type +) + + +def utf8(s): + if isinstance(s, text_type): + s = s.encode('utf-8') + + return byte_type(s) + + +def decode_utf8(s): + if isinstance(s, byte_type): + s = s.decode("utf-8") + + return text_type(s) |
