diff options
author | Torsten Kurbad <github@tk-webart.de> | 2005-10-08 16:07:15 +0000 |
---|---|---|
committer | Torsten Kurbad <github@tk-webart.de> | 2005-10-08 16:07:15 +0000 |
commit | 48c9d02775699610c169bd19e017832769fe9f8c (patch) | |
tree | 3687a2895feae2b81456cf19b6f9dda61739099f | |
download | zope-publisher-monolithic-zope3-zope3-twisted-merge.tar.gz |
Merged from srichter-twisted-integration2 38370->38950monolithic-zope3-zope3-twisted-merge
-rw-r--r-- | base.py | 568 | ||||
-rw-r--r-- | http.py | 1017 | ||||
-rw-r--r-- | interfaces/__init__.py | 465 | ||||
-rw-r--r-- | tests/basetestiapplicationrequest.py | 48 | ||||
-rw-r--r-- | tests/test_baserequest.py | 96 | ||||
-rw-r--r-- | tests/test_http.py | 633 |
6 files changed, 2827 insertions, 0 deletions
@@ -0,0 +1,568 @@ +############################################################################## +# +# Copyright (c) 2001, 2002 Zope Corporation and Contributors. +# All Rights Reserved. +# +# This software is subject to the provisions of the Zope Public License, +# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. +# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED +# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS +# FOR A PARTICULAR PURPOSE. +# +############################################################################## +"""Base implementations of the Publisher objects + +Specifically, 'BaseRequest', 'BaseResponse', and 'DefaultPublication' are +specified here. + +$Id$ +""" +import traceback +from cStringIO import StringIO + +from zope.deprecation import deprecated + +from zope.interface import implements, providedBy +from zope.interface.common.mapping import IReadMapping, IEnumerableMapping + +from zope.publisher.interfaces import IPublication, IHeld +from zope.publisher.interfaces import NotFound, DebugError, Unauthorized +from zope.publisher.interfaces import IRequest, IResponse, IDebugFlags +from zope.publisher.publish import mapply + +_marker = object() + +class BaseResponse(object): + """Base Response Class + """ + + __slots__ = ( + '_result', # The result of the application call + '_request', # The associated request (if any) + ) + + implements(IResponse) + + def __init__(self, outstream=None): + self._request = None + # BBB: This is backward-compatibility support for the deprecated + # output stream. + if outstream is not None: + import warnings + warnings.warn("Can't pass output streams to requests anymore. " + "This will go away in Zope 3.4.", + DeprecationWarning, + 2) + + def setResult(self, result): + 'See IPublisherResponse' + self._result = result + + def handleException(self, exc_info): + 'See IPublisherResponse' + f = StringIO() + traceback.print_exception( + exc_info[0], exc_info[1], exc_info[2], 100, f) + self.setResult(f.getvalue()) + + def internalError(self): + 'See IPublisherResponse' + pass + + def reset(self): + 'See IPublisherResponse' + pass + + def retry(self): + 'See IPublisherResponse' + return self.__class__() + + # BBB: Backward-compatibility for old body API + def setBody(self, body): + return self.setResult(body) + setBody = deprecated( + setBody, + 'setBody() has been deprecated in favor of setResult(). ' + 'This will go away in Zope 3.4.') + + +class RequestDataGetter(object): + + implements(IReadMapping) + + def __init__(self, request): + self.__get = getattr(request, self._gettrname) + + def __getitem__(self, name): + return self.__get(name) + + def get(self, name, default=None): + return self.__get(name, default) + + def __contains__(self, key): + lookup = self.get(key, self) + return lookup is not self + + has_key = __contains__ + +class RequestDataMapper(object): + + implements(IEnumerableMapping) + + def __init__(self, request): + self.__map = getattr(request, self._mapname) + + def __getitem__(self, name): + return self.__map[name] + + def get(self, name, default=None): + return self.__map.get(name, default) + + def __contains__(self, key): + lookup = self.get(key, self) + return lookup is not self + + has_key = __contains__ + + def keys(self): + return self.__map.keys() + + def __iter__(self): + return iter(self.keys()) + + def items(self): + return self.__map.items() + + def values(self): + return self.__map.values() + + def __len__(self): + return len(self.__map) + +class RequestDataProperty(object): + + def __init__(self, gettr_class): + self.__gettr_class = gettr_class + + def __get__(self, request, rclass=None): + if request is not None: + return self.__gettr_class(request) + + def __set__(*args): + raise AttributeError('Unassignable attribute') + + +class RequestEnvironment(RequestDataMapper): + _mapname = '_environ' + + +class DebugFlags(object): + """Debugging flags.""" + + implements(IDebugFlags) + + sourceAnnotations = False + showTAL = False + + +class BaseRequest(object): + """Represents a publishing request. + + This object provides access to request data. Request data may + vary depending on the protocol used. + + Request objects are created by the object publisher and will be + passed to published objects through the argument name, REQUEST. + + The request object is a mapping object that represents a + collection of variable to value mappings. + """ + + implements(IRequest) + + __slots__ = ( + '__provides__', # Allow request to directly provide interfaces + '_held', # Objects held until the request is closed + '_traversed_names', # The names that have been traversed + '_last_obj_traversed', # Object that was traversed last + '_traversal_stack', # Names to be traversed, in reverse order + '_environ', # The request environment variables + '_response', # The response + '_args', # positional arguments + '_body_instream', # input stream + '_body', # The request body as a string + '_publication', # publication object + '_principal', # request principal, set by publication + 'interaction', # interaction, set by interaction + 'debug', # debug flags + 'annotations', # per-package annotations + ) + + environment = RequestDataProperty(RequestEnvironment) + + def __init__(self, body_instream, environ, response=None, + positional=None, outstream=None): + + # BBB: This is backward-compatibility support for the deprecated + # output stream. + if not hasattr(environ, 'get'): + import warnings + warnings.warn("Can't pass output streams to requests anymore. " + "This will go away in Zope 3.4.", + DeprecationWarning, + 2) + environ, response, positional = response, positional, outstream + + + self._traversal_stack = [] + self._last_obj_traversed = None + self._traversed_names = [] + self._environ = environ + + self._args = positional or () + + if response is None: + self._response = self._createResponse() + else: + self._response = response + + self._response._request = self + + self._body_instream = body_instream + self._held = () + self._principal = None + self.debug = DebugFlags() + self.interaction = None + self.annotations = {} + + def setPrincipal(self, principal): + self._principal = principal + + principal = property(lambda self: self._principal) + + def _getPublication(self): + 'See IPublisherRequest' + return getattr(self, '_publication', None) + + publication = property(_getPublication) + + def processInputs(self): + 'See IPublisherRequest' + # Nothing to do here + + def retry(self): + 'See IPublisherRequest' + raise TypeError('Retry is not supported') + + def setPublication(self, pub): + 'See IPublisherRequest' + self._publication = pub + + def supportsRetry(self): + 'See IPublisherRequest' + return 0 + + def traverse(self, object): + 'See IPublisherRequest' + + publication = self.publication + + traversal_stack = self._traversal_stack + traversed_names = self._traversed_names + + self._last_obj_traversed = object + + prev_object = None + while True: + + if object is not prev_object: + # Invoke hooks (but not more than once). + publication.callTraversalHooks(self, object) + + prev_object = object + + if traversal_stack: + # Traverse to the next step. + entry_name = traversal_stack.pop() + traversed_names.append(entry_name) + subobject = publication.traverseName( + self, object, entry_name) + self._last_obj_traversed = object = subobject + else: + # Finished traversal. + break + + return object + + def close(self): + 'See IPublicationRequest' + + for held in self._held: + if IHeld.providedBy(held): + held.release() + + self._held = None + self._body_instream = None + self._publication = None + + def getPositionalArguments(self): + 'See IPublicationRequest' + return self._args + + def _getResponse(self): + return self._response + + response = property(_getResponse) + + def getTraversalStack(self): + 'See IPublicationRequest' + return list(self._traversal_stack) # Return a copy + + def hold(self, object): + 'See IPublicationRequest' + self._held = self._held + (object,) + + def setTraversalStack(self, stack): + 'See IPublicationRequest' + self._traversal_stack[:] = list(stack) + + def _getBodyStream(self): + 'See zope.publisher.interfaces.IApplicationRequest' + return self._body_instream + + bodyStream = property(_getBodyStream) + + ######################################################################## + # BBB: Deprecated; will go away in Zope 3.4 + + def _getBody(self): + body = getattr(self, '_body', None) + if body is None: + s = self._body_instream + if s is None: + return None # TODO: what should be returned here? + body = s.read() + self._body = body + return body + + body = property(_getBody) + body = deprecated(body, + 'The ``body`` attribute has been deprecated. Please ' + 'use the ``bodyStream`` attribute directly. This ' + 'attribute will go away in Zope 3.4.') + + bodyFile = bodyStream + bodyFile = deprecated(bodyFile, + 'The ``bodyFile`` attribute has been replaced by ' + '``bodyStream``, which is a more accurate name. ' + 'Streams are not necessarily files, i.e. they are ' + 'not seekable. This attribute will go away in Zope ' + '3.4.') + + ######################################################################## + + def __len__(self): + 'See Interface.Common.Mapping.IEnumerableMapping' + return len(self.keys()) + + def items(self): + 'See Interface.Common.Mapping.IEnumerableMapping' + result = [] + get = self.get + for k in self.keys(): + result.append((k, get(k))) + return result + + def keys(self): + 'See Interface.Common.Mapping.IEnumerableMapping' + return self._environ.keys() + + def __iter__(self): + return iter(self.keys()) + + def values(self): + 'See Interface.Common.Mapping.IEnumerableMapping' + result = [] + get = self.get + for k in self.keys(): + result.append(get(k)) + return result + + def __getitem__(self, key): + 'See Interface.Common.Mapping.IReadMapping' + result = self.get(key, _marker) + if result is _marker: + raise KeyError(key) + else: + return result + + def get(self, key, default=None): + 'See Interface.Common.Mapping.IReadMapping' + + result = self._environ.get(key, self) + if result is not self: return result + + return default + + def __contains__(self, key): + 'See Interface.Common.Mapping.IReadMapping' + lookup = self.get(key, self) + return lookup is not self + + has_key = __contains__ + + def _createResponse(self): + # Should be overridden by subclasses + return BaseResponse() + + def __nonzero__(self): + # This is here to avoid calling __len__ for boolean tests + return 1 + + def __str__(self): + L1 = self.items() + L1.sort() + return "\n".join(map(lambda item: "%s:\t%s" % item, L1)) + + def _setupPath_helper(self, attr): + path = self.get(attr, "/").strip() + if path.endswith('/'): + # Remove trailing backslash, so that we will not get an empty + # last entry when splitting the path. + path = path[:-1] + self._endswithslash = True + else: + self._endswithslash = False + + clean = [] + for item in path.split('/'): + if not item or item == '.': + continue + elif item == '..': + try: + del clean[-1] + except IndexError: + raise NotFound('..') + else: clean.append(item) + + clean.reverse() + self.setTraversalStack(clean) + + self._path_suffix = None + +class TestRequest(BaseRequest): + + __slots__ = ('_presentation_type', ) + + def __init__(self, path, body_instream=None, environ=None, outstream=None): + + # BBB: This is backward-compatibility support for the deprecated + # output stream. + if environ is None: + environ = {} + else: + if not hasattr(environ, 'get'): + import warnings + warnings.warn("Can't pass output streams to requests anymore. " + "This will go away in Zope 3.4.", + DeprecationWarning, + 2) + environ, outstream = outstream, environ + + environ['PATH_INFO'] = path + if body_instream is None: + body_instream = StringIO('') + + super(TestRequest, self).__init__(body_instream, environ) + self.response._outstream = outstream + + def _createResponse(self): + return BBBResponse() + +class BBBResponse(BaseResponse): + + def outputBody(self): + import warnings + warnings.warn("Can't pass output streams to requests anymore", + DeprecationWarning, + 2) + self._outstream.write(self._result) + +class DefaultPublication(object): + """A stub publication. + + This works just like Zope2's ZPublisher. It rejects any name + starting with an underscore and any objects (specifically: method) + that doesn't have a docstring. + """ + implements(IPublication) + + require_docstrings = True + + def __init__(self, app): + self.app = app + + def beforeTraversal(self, request): + # Lop off leading and trailing empty names + stack = request.getTraversalStack() + while stack and not stack[-1]: + stack.pop() # toss a trailing empty name + while stack and not stack[0]: + stack.pop(0) # toss a leading empty name + request.setTraversalStack(stack) + + def getApplication(self, request): + return self.app + + def callTraversalHooks(self, request, ob): + pass + + def traverseName(self, request, ob, name, check_auth=1): + if name.startswith('_'): + raise Unauthorized(name) + if hasattr(ob, name): + subob = getattr(ob, name) + else: + try: + subob = ob[name] + except (KeyError, IndexError, + TypeError, AttributeError): + raise NotFound(ob, name, request) + if self.require_docstrings and not getattr(subob, '__doc__', None): + raise DebugError(subob, 'Missing or empty doc string') + return subob + + def getDefaultTraversal(self, request, ob): + return ob, () + + def afterTraversal(self, request, ob): + pass + + def callObject(self, request, ob): + return mapply(ob, request.getPositionalArguments(), request) + + def afterCall(self, request, ob): + pass + + def endRequest(self, request, ob): + pass + + def handleException(self, object, request, exc_info, retry_allowed=1): + # Let the response handle it as best it can. + request.response.reset() + request.response.handleException(exc_info) + + +class TestPublication(DefaultPublication): + + def traverseName(self, request, ob, name, check_auth=1): + if hasattr(ob, name): + subob = getattr(ob, name) + else: + try: + subob = ob[name] + except (KeyError, IndexError, + TypeError, AttributeError): + raise NotFound(ob, name, request) + return subob @@ -0,0 +1,1017 @@ +############################################################################## +# +# Copyright (c) 2001, 2002 Zope Corporation and Contributors. +# All Rights Reserved. +# +# This software is subject to the provisions of the Zope Public License, +# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. +# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED +# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS +# FOR A PARTICULAR PURPOSE. +# +############################################################################## +"""HTTP Publisher + +$Id$ +""" +import re, time, random +from urllib import quote, unquote, splitport +from types import StringTypes, ClassType +from cgi import escape +from Cookie import SimpleCookie +from Cookie import CookieError +import logging +from tempfile import TemporaryFile + +from zope.deprecation import deprecation +from zope.interface import implements + +from zope.publisher import contenttype +from zope.publisher.interfaces.http import IHTTPCredentials +from zope.publisher.interfaces.http import IHTTPRequest +from zope.publisher.interfaces.http import IHTTPApplicationRequest +from zope.publisher.interfaces.http import IHTTPPublisher + +from zope.publisher.interfaces import Redirect +from zope.publisher.interfaces.http import IHTTPResponse, IResult +from zope.publisher.interfaces.http import IHTTPApplicationResponse +from zope.publisher.interfaces.logginginfo import ILoggingInfo +from zope.i18n.interfaces import IUserPreferredCharsets +from zope.i18n.interfaces import IUserPreferredLanguages +from zope.i18n.locales import locales, LoadLocaleError + +from zope.publisher import contenttype +from zope.publisher.base import BaseRequest, BaseResponse +from zope.publisher.base import RequestDataProperty, RequestDataMapper +from zope.publisher.base import RequestDataGetter + + +# Default Encoding +ENCODING = 'UTF-8' + +class CookieMapper(RequestDataMapper): + _mapname = '_cookies' + +class HeaderGetter(RequestDataGetter): + _gettrname = 'getHeader' + +base64 = None + +def sane_environment(env): + # return an environment mapping which has been cleaned of + # funny business such as REDIRECT_ prefixes added by Apache + # or HTTP_CGI_AUTHORIZATION hacks. + # It also makes sure PATH_INFO is a unicode string. + dict = {} + for key, val in env.items(): + while key.startswith('REDIRECT_'): + key = key[9:] + dict[key] = val + if 'HTTP_CGI_AUTHORIZATION' in dict: + dict['HTTP_AUTHORIZATION'] = dict.pop('HTTP_CGI_AUTHORIZATION') + if 'PATH_INFO' in dict: + dict['PATH_INFO'] = dict['PATH_INFO'].decode('utf-8') + return dict + +# Possible HTTP status responses +status_reasons = { +100: 'Continue', +101: 'Switching Protocols', +102: 'Processing', +200: 'OK', +201: 'Created', +202: 'Accepted', +203: 'Non-Authoritative Information', +204: 'No Content', +205: 'Reset Content', +206: 'Partial Content', +207: 'Multi-Status', +300: 'Multiple Choices', +301: 'Moved Permanently', +302: 'Moved Temporarily', +303: 'See Other', +304: 'Not Modified', +305: 'Use Proxy', +307: 'Temporary Redirect', +400: 'Bad Request', +401: 'Unauthorized', +402: 'Payment Required', +403: 'Forbidden', +404: 'Not Found', +405: 'Method Not Allowed', +406: 'Not Acceptable', +407: 'Proxy Authentication Required', +408: 'Request Time-out', +409: 'Conflict', +410: 'Gone', +411: 'Length Required', +412: 'Precondition Failed', +413: 'Request Entity Too Large', +414: 'Request-URI Too Large', +415: 'Unsupported Media Type', +416: 'Requested range not satisfiable', +417: 'Expectation Failed', +422: 'Unprocessable Entity', +423: 'Locked', +424: 'Failed Dependency', +500: 'Internal Server Error', +501: 'Not Implemented', +502: 'Bad Gateway', +503: 'Service Unavailable', +504: 'Gateway Time-out', +505: 'HTTP Version not supported', +507: 'Insufficient Storage', +} + +status_codes={} + +def init_status_codes(): + # Add mappings for builtin exceptions and + # provide text -> error code lookups. + for key, val in status_reasons.items(): + status_codes[val.replace(' ', '').lower()] = key + status_codes[val.lower()] = key + status_codes[key] = key + status_codes[str(key)] = key + + en = [n.lower() for n in dir(__builtins__) if n.endswith('Error')] + + for name in en: + status_codes[name] = 500 + +init_status_codes() + + +class URLGetter(object): + + __slots__ = "__request" + + def __init__(self, request): + self.__request = request + + def __str__(self): + return self.__request.getURL() + + def __getitem__(self, name): + url = self.get(name, None) + if url is None: + raise KeyError(name) + return url + + def get(self, name, default=None): + i = int(name) + try: + if i < 0: + i = -i + return self.__request.getURL(i) + else: + return self.__request.getApplicationURL(i) + except IndexError, v: + if v[0] == i: + return default + raise + +class HTTPInputStream(object): + """Special stream that supports caching the read data. + + This is important, so that we can retry requests. + """ + + def __init__(self, stream): + self.stream = stream + self.cacheStream = TemporaryFile() + + def getCacheStream(self): + self.read() + self.cacheStream.seek(0) + return self.cacheStream + + def read(self, size=-1): + data = self.stream.read(size) + self.cacheStream.write(data) + return data + + def readline(self): + data = self.stream.readline() + self.cacheStream.write(data) + return data + + def readlines(self, hint=None): + data = self.stream.readlines(hint) + self.cacheStream.write(''.join(data)) + return data + + +DEFAULT_PORTS = {'http': '80', 'https': '443'} +STAGGER_RETRIES = True + +class HTTPRequest(BaseRequest): + """Model HTTP request data. + + This object provides access to request data. This includes, the + input headers, form data, server data, and cookies. + + Request objects are created by the object publisher and will be + passed to published objects through the argument name, REQUEST. + + The request object is a mapping object that represents a + collection of variable to value mappings. In addition, variables + are divided into four categories: + + - Environment variables + + These variables include input headers, server data, and other + request-related data. The variable names are as <a + href="http://hoohoo.ncsa.uiuc.edu/cgi/env.html">specified</a> + in the <a + href="http://hoohoo.ncsa.uiuc.edu/cgi/interface.html">CGI + specification</a> + + - Form data + + These are data extracted from either a URL-encoded query + string or body, if present. + + - Cookies + + These are the cookie data, if present. + + - Other + + Data that may be set by an application object. + + The form attribute of a request is actually a Field Storage + object. When file uploads are used, this provides a richer and + more complex interface than is provided by accessing form data as + items of the request. See the FieldStorage class documentation + for more details. + + The request object may be used as a mapping object, in which case + values will be looked up in the order: environment variables, + other variables, form data, and then cookies. + """ + implements(IHTTPCredentials, IHTTPRequest, IHTTPApplicationRequest) + + __slots__ = ( + '__provides__', # Allow request to directly provide interfaces + '_auth', # The value of the HTTP_AUTHORIZATION header. + '_cookies', # The request cookies + '_path_suffix', # Extra traversal steps after normal traversal + '_retry_count', # How many times the request has been retried + '_app_names', # The application path as a sequence + '_app_server', # The server path of the application url + '_orig_env', # The original environment + '_endswithslash', # Does the given path end with / + 'method', # The upper-cased request method (REQUEST_METHOD) + '_locale', # The locale for the request + '_vh_root', # Object at the root of the virtual host + ) + + retry_max_count = 3 # How many times we're willing to retry + + def __init__(self, body_instream, environ, response=None, outstream=None): + # BBB: This is backward-compatibility support for the deprecated + # output stream. + try: + environ.get + except AttributeError: + import warnings + warnings.warn("Can't pass output streams to requests anymore. " + "This will go away in Zope 3.4.", + DeprecationWarning, + 2) + environ, response = response, outstream + + super(HTTPRequest, self).__init__( + HTTPInputStream(body_instream), environ, response) + + self._orig_env = environ + environ = sane_environment(environ) + + if 'HTTP_AUTHORIZATION' in environ: + self._auth = environ['HTTP_AUTHORIZATION'] + del environ['HTTP_AUTHORIZATION'] + else: + self._auth = None + + self.method = environ.get("REQUEST_METHOD", 'GET').upper() + + self._environ = environ + + self.__setupCookies() + self.__setupPath() + self.__setupURLBase() + self._vh_root = None + self.__setupLocale() + + def __setupLocale(self): + envadapter = IUserPreferredLanguages(self, None) + if envadapter is None: + self._locale = None + return + + langs = envadapter.getPreferredLanguages() + for httplang in langs: + parts = (httplang.split('-') + [None, None])[:3] + try: + self._locale = locales.getLocale(*parts) + return + except LoadLocaleError: + # Just try the next combination + pass + else: + # No combination gave us an existing locale, so use the default, + # which is guaranteed to exist + self._locale = locales.getLocale(None, None, None) + + def _getLocale(self): + return self._locale + locale = property(_getLocale) + + def __setupURLBase(self): + get_env = self._environ.get + # Get base info first. This isn't likely to cause + # errors and might be useful to error handlers. + script = get_env('SCRIPT_NAME', '').strip() + + # _script and the other _names are meant for URL construction + self._app_names = filter(None, script.split('/')) + + # get server URL and store it too, since we are already looking it up + server_url = get_env('SERVER_URL', None) + if server_url is not None: + self._app_server = server_url = server_url.strip() + else: + server_url = self.__deduceServerURL() + + if server_url.endswith('/'): + server_url = server_url[:-1] + + # strip off leading /'s of script + while script.startswith('/'): + script = script[1:] + + self._app_server = server_url + + def __deduceServerURL(self): + environ = self._environ + + if (environ.get('HTTPS', '').lower() == "on" or + environ.get('SERVER_PORT_SECURE') == "1"): + protocol = 'https' + else: + protocol = 'http' + + if environ.has_key('HTTP_HOST'): + host = environ['HTTP_HOST'].strip() + hostname, port = splitport(host) + else: + hostname = environ.get('SERVER_NAME', '').strip() + port = environ.get('SERVER_PORT', '') + + if port and port != DEFAULT_PORTS.get(protocol): + host = hostname + ':' + port + else: + host = hostname + + return '%s://%s' % (protocol, host) + + def _parseCookies(self, text, result=None): + """Parse 'text' and return found cookies as 'result' dictionary.""" + + if result is None: + result = {} + + # ignore cookies on a CookieError + try: + c = SimpleCookie(text) + except CookieError, e: + log = logging.getLogger('eventlog') + log.warn(e) + return result + + for k,v in c.items(): + result[unicode(k, ENCODING)] = unicode(v.value, ENCODING) + + return result + + def __setupCookies(self): + # Cookie values should *not* be appended to existing form + # vars with the same name - they are more like default values + # for names not otherwise specified in the form. + self._cookies = {} + cookie_header = self._environ.get('HTTP_COOKIE', None) + if cookie_header is not None: + self._parseCookies(cookie_header, self._cookies) + + def __setupPath(self): + # PATH_INFO is unicode here, so setupPath_helper sets up the + # traversal stack correctly. + self._setupPath_helper("PATH_INFO") + + def supportsRetry(self): + 'See IPublisherRequest' + count = getattr(self, '_retry_count', 0) + if count < self.retry_max_count: + if STAGGER_RETRIES: + time.sleep(random.uniform(0, 2**(count))) + return True + + def retry(self): + 'See IPublisherRequest' + count = getattr(self, '_retry_count', 0) + self._retry_count = count + 1 + + new_response = self.response.retry() + request = self.__class__( + # Use the cache stream as the new input stream. + body_instream=self._body_instream.getCacheStream(), + environ=self._orig_env, + response=new_response, + ) + request.setPublication(self.publication) + request._retry_count = self._retry_count + return request + + def traverse(self, object): + 'See IPublisherRequest' + + ob = super(HTTPRequest, self).traverse(object) + if self._path_suffix: + self._traversal_stack = self._path_suffix + ob = super(HTTPRequest, self).traverse(ob) + + return ob + + def getHeader(self, name, default=None, literal=False): + 'See IHTTPRequest' + environ = self._environ + if not literal: + name = name.replace('-', '_').upper() + val = environ.get(name, None) + if val is not None: + return val + if not name.startswith('HTTP_'): + name='HTTP_%s' % name + return environ.get(name, default) + + headers = RequestDataProperty(HeaderGetter) + + def getCookies(self): + 'See IHTTPApplicationRequest' + return self._cookies + + cookies = RequestDataProperty(CookieMapper) + + def setPathSuffix(self, steps): + 'See IHTTPRequest' + steps = list(steps) + steps.reverse() + self._path_suffix = steps + + def _authUserPW(self): + 'See IHTTPCredentials' + global base64 + if self._auth: + if self._auth.lower().startswith('basic '): + if base64 is None: + import base64 + name, password = base64.decodestring( + self._auth.split()[-1]).split(':') + return name, password + + def unauthorized(self, challenge): + 'See IHTTPCredentials' + self._response.setHeader("WWW-Authenticate", challenge, True) + self._response.setStatus(401) + + def setPrincipal(self, principal): + 'See IPublicationRequest' + super(HTTPRequest, self).setPrincipal(principal) + logging_info = ILoggingInfo(principal, None) + if logging_info is None: + message = '-' + else: + message = logging_info.getLogMessage() + self.response.authUser = message + + def _createResponse(self): + # Should be overridden by subclasses + return HTTPResponse() + + + def getURL(self, level=0, path_only=False): + names = self._app_names + self._traversed_names + if level: + if level > len(names): + raise IndexError(level) + names = names[:-level] + # See: http://www.ietf.org/rfc/rfc2718.txt, Section 2.2.5 + names = [quote(name.encode("utf-8"), safe='/+@') for name in names] + + if path_only: + if not names: + return '/' + return '/' + '/'.join(names) + else: + if not names: + return self._app_server + return "%s/%s" % (self._app_server, '/'.join(names)) + + def getApplicationURL(self, depth=0, path_only=False): + """See IHTTPApplicationRequest""" + if depth: + names = self._traversed_names + if depth > len(names): + raise IndexError(depth) + names = self._app_names + names[:depth] + else: + names = self._app_names + + # See: http://www.ietf.org/rfc/rfc2718.txt, Section 2.2.5 + names = [quote(name.encode("utf-8"), safe='/+@') for name in names] + + if path_only: + return names and ('/' + '/'.join(names)) or '/' + else: + return (names and ("%s/%s" % (self._app_server, '/'.join(names))) + or self._app_server) + + def setApplicationServer(self, host, proto='http', port=None): + if port and str(port) != DEFAULT_PORTS.get(proto): + host = '%s:%s' % (host, port) + self._app_server = '%s://%s' % (proto, host) + + def shiftNameToApplication(self): + """Add the name being traversed to the application name + + This is only allowed in the case where the name is the first name. + + A Value error is raise if the shift can't be performed. + """ + if len(self._traversed_names) == 1: + self._app_names.append(self._traversed_names.pop()) + return + + raise ValueError("Can only shift leading traversal " + "names to application names") + + def setVirtualHostRoot(self, names=()): + del self._traversed_names[:] + self._vh_root = self._last_obj_traversed + self._app_names = list(names) + + def getVirtualHostRoot(self): + return self._vh_root + + URL = RequestDataProperty(URLGetter) + + def __repr__(self): + # Returns a *short* string. + return '<%s.%s instance URL=%s>' % ( + self.__class__.__module__, self.__class__.__name__, str(self.URL)) + + def get(self, key, default=None): + 'See Interface.Common.Mapping.IReadMapping' + + result = self._cookies.get(key, self) + if result is not self: return result + + result = self._environ.get(key, self) + if result is not self: return result + + return default + + def keys(self): + 'See Interface.Common.Mapping.IEnumerableMapping' + d = {} + d.update(self._environ) + d.update(self._cookies) + return d.keys() + + +class HTTPResponse(BaseResponse): + implements(IHTTPResponse, IHTTPApplicationResponse) + + __slots__ = ( + 'authUser', # Authenticated user string + # BBB: Remove for Zope 3.4. + '_header_output', # Hook object to collaborate with a server + # for header generation. + '_headers', + '_cookies', + '_status', # The response status (usually an integer) + '_reason', # The reason that goes with the status + '_status_set', # Boolean: status explicitly set + '_charset', # String: character set for the output + ) + + + def __init__(self, header_output=None, http_transaction=None): + # BBB: Both, header_output and http_transaction have been deprecated. + if header_output is not None: + import warnings + warnings.warn( + "The header output API is completely deprecated. It's " + "intentions were not clear and it duplicated APIs in the " + "response, which you should use instead. " + "This will go away in Zope 3.4.", + DeprecationWarning, 2) + + if http_transaction is not None: + import warnings + warnings.warn( + "Storing the HTTP transaction here was a *huge* hack to " + "support transporting the authenticated user string " + "to the server. You should never rely on this variable " + "anyways. " + "This will go away in Zope 3.4.", + DeprecationWarning, 2) + + self._header_output = header_output + + super(HTTPResponse, self).__init__() + self.reset() + + + def reset(self): + 'See IResponse' + super(HTTPResponse, self).reset() + self._headers = {} + self._cookies = {} + self._status = 599 + self._reason = 'No status set' + self._status_set = False + self._charset = None + self.authUser = '-' + + def setStatus(self, status, reason=None): + 'See IHTTPResponse' + if status is None: + status = 200 + else: + if type(status) in StringTypes: + status = status.lower() + if status in status_codes: + status = status_codes[status] + else: + status = 500 + self._status = status + + if reason is None: + if status == 200: + reason = 'Ok' + elif status in status_reasons: + reason = status_reasons[status] + else: + reason = 'Unknown' + self._reason = reason + self._status_set = True + + + def getStatus(self): + 'See IHTTPResponse' + return self._status + + def getStatusString(self): + 'See IHTTPResponse' + return '%i %s' % (self._status, self._reason) + + def setHeader(self, name, value, literal=False): + 'See IHTTPResponse' + name = str(name) + value = str(value) + + if not literal: + name = name.lower() + + self._headers[name] = [value] + + + def addHeader(self, name, value): + 'See IHTTPResponse' + values = self._headers.setdefault(name, []) + values.append(value) + + + def getHeader(self, name, default=None, literal=False): + 'See IHTTPResponse' + key = name.lower() + name = literal and name or key + result = self._headers.get(name) + if result: + return result[0] + return default + + + def getHeaders(self): + 'See IHTTPResponse' + result = [] + headers = self._headers + + result.append( + ("X-Powered-By", "Zope (www.zope.org), Python (www.python.org)")) + + for key, values in headers.items(): + if key.lower() == key: + # only change non-literal header names + key = '-'.join([k.capitalize() for k in key.split('-')]) + result.extend([(key, val) for val in values]) + + result.extend([tuple(cookie.split(': ', 1)) + for cookie in self._cookie_list()]) + + return result + + + def appendToCookie(self, name, value): + 'See IHTTPResponse' + cookies = self._cookies + if name in cookies: + cookie = cookies[name] + else: + cookie = cookies[name] = {} + if 'value' in cookie: + cookie['value'] = '%s:%s' % (cookie['value'], value) + else: + cookie['value'] = value + + + def expireCookie(self, name, **kw): + 'See IHTTPResponse' + dict = {'max_age':0, 'expires':'Wed, 31-Dec-97 23:59:59 GMT'} + for k, v in kw.items(): + if v is not None: + dict[k] = v + cookies = self._cookies + if name in cookies: + # Cancel previous setCookie(). + del cookies[name] + self.setCookie(name, 'deleted', **dict) + + + def setCookie(self, name, value, **kw): + 'See IHTTPResponse' + cookies = self._cookies + cookie = cookies.setdefault(name, {}) + + for k, v in kw.items(): + if v is not None: + cookie[k.lower()] = v + + cookie['value'] = value + + + def getCookie(self, name, default=None): + 'See IHTTPResponse' + return self._cookies.get(name, default) + + + def setResult(self, result): + r = IResult(result, None) + if r is None: + if isinstance(result, basestring): + body, headers = self._implicitResult(result) + r = DirectResult((body,), headers) + elif result is None: + body, headers = self._implicitResult('') + r = DirectResult((body,), headers) + else: + raise TypeError('The result should be adaptable to IResult.') + self._result = r + self._headers.update(dict([(k, [v]) for (k, v) in r.headers])) + if not self._status_set: + self.setStatus(200) + + + def consumeBody(self): + 'See IHTTPResponse' + return ''.join(self._result.body) + + + def consumeBodyIter(self): + 'See IHTTPResponse' + return self._result.body + + + # BBB: Backward-compatibility for old body API + _body = property(consumeBody) + _body = deprecation.deprecated( + _body, + '`_body` has been deprecated in favor of `consumeBody()`. ' + 'This will go away in Zope 3.4.') + + + def _implicitResult(self, body): + encoding = getCharsetUsingRequest(self._request) or 'utf-8' + content_type = self.getHeader('content-type') + + if isinstance(body, unicode): + try: + if not content_type.startswith('text/'): + raise ValueError( + 'Unicode results must have a text content type.') + except AttributeError: + raise ValueError( + 'Unicode results must have a text content type.') + + + major, minor, params = contenttype.parse(content_type) + + if 'charset' in params: + encoding = params['charset'] + else: + content_type += ';charset=%s' %encoding + + body = body.encode(encoding) + + if content_type: + headers = [('content-type', content_type), + ('content-length', str(len(body)))] + else: + headers = [('content-length', str(len(body)))] + + return body, headers + + + def handleException(self, exc_info): + """ + Calls self.setBody() with an error response. + """ + t, v = exc_info[:2] + if isinstance(t, ClassType): + if issubclass(t, Redirect): + self.redirect(v.getLocation()) + return + title = tname = t.__name__ + else: + title = tname = unicode(t) + + # Throwing non-protocol-specific exceptions is a good way + # for apps to control the status code. + self.setStatus(tname) + + body = self._html(title, "A server error occurred." ) + self.setResult(body) + + + def internalError(self): + 'See IPublisherResponse' + self.setStatus(500, u"The engines can't take any more, Jim!") + + + def _html(self, title, content): + t = escape(title) + return ( + u"<html><head><title>%s</title></head>\n" + u"<body><h2>%s</h2>\n" + u"%s\n" + u"</body></html>\n" % + (t, t, content) + ) + + + def retry(self): + """ + Returns a response object to be used in a retry attempt + """ + return self.__class__() + + + def redirect(self, location, status=None): + """Causes a redirection without raising an error""" + if status is None: + # parse the HTTP version and set default accordingly + if (self._request.get("SERVER_PROTOCOL","HTTP/1.0") < + "HTTP/1.1"): + status=302 + else: + status=303 + + self.setStatus(status) + self.setHeader('Location', location) + self.setResult(DirectResult(())) + return location + + def _cookie_list(self): + try: + c = SimpleCookie() + except CookieError, e: + log = logging.getLogger('eventlog') + log.warn(e) + return [] + for name, attrs in self._cookies.items(): + name = str(name) + c[name] = attrs['value'].encode(ENCODING) + for k,v in attrs.items(): + if k == 'value': + continue + if k == 'secure': + if v: + c[name]['secure'] = True + continue + if k == 'max_age': + k = 'max-age' + elif k == 'comment': + # Encode rather than throw an exception + v = quote(v.encode('utf-8'), safe="/?:@&+") + c[name][k] = str(v) + return str(c).splitlines() + + +def sort_charsets(x, y): + if y[1] == 'utf-8': + return 1 + if x[1] == 'utf-8': + return -1 + return cmp(y, x) + + +class HTTPCharsets(object): + implements(IUserPreferredCharsets) + + def __init__(self, request): + self.request = request + + def getPreferredCharsets(self): + '''See interface IUserPreferredCharsets''' + charsets = [] + sawstar = sawiso88591 = 0 + header_present = 'HTTP_ACCEPT_CHARSET' in self.request + for charset in self.request.get('HTTP_ACCEPT_CHARSET', '').split(','): + charset = charset.strip().lower() + if charset: + if ';' in charset: + charset, quality = charset.split(';') + if not quality.startswith('q='): + # not a quality parameter + quality = 1.0 + else: + try: + quality = float(quality[2:]) + except ValueError: + continue + else: + quality = 1.0 + if quality == 0.0: + continue + if charset == '*': + sawstar = 1 + if charset == 'iso-8859-1': + sawiso88591 = 1 + charsets.append((quality, charset)) + # Quoting RFC 2616, $14.2: If no "*" is present in an Accept-Charset + # field, then all character sets not explicitly mentioned get a + # quality value of 0, except for ISO-8859-1, which gets a quality + # value of 1 if not explicitly mentioned. + # And quoting RFC 2616, $14.2: "If no Accept-Charset header is + # present, the default is that any character set is acceptable." + if not sawstar and not sawiso88591 and header_present: + charsets.append((1.0, 'iso-8859-1')) + # UTF-8 is **always** preferred over anything else. + # Reason: UTF-8 is not specific and can encode the entire unicode + # range , unlike many other encodings. Since Zope can easily use very + # different ranges, like providing a French-Chinese dictionary, it is + # always good to use UTF-8. + charsets.sort(sort_charsets) + return [c[1] for c in charsets] + + +def getCharsetUsingRequest(request): + 'See IHTTPResponse' + envadapter = IUserPreferredCharsets(request, None) + if envadapter is None: + return + + try: + charset = envadapter.getPreferredCharsets()[0] + except IndexError: + # Exception caused by empty list! This is okay though, since the + # browser just could have sent a '*', which means we can choose + # the encoding, which we do here now. + charset = 'utf-8' + return charset + + +class DirectResult(object): + """A generic result object. + + The result's body can be any iteratable. It is the responsibility of the + application to specify all headers related to the content, such as the + content type and length. + """ + implements(IResult) + + def __init__(self, body, headers=()): + self.body = body + self.headers = headers + + +def StrResult(body, headers=()): + """A simple string result that represents any type of data. + + It is the responsibility of the application to specify all the headers, + including content type and length. + """ + return DirectResult((body,), headers) diff --git a/interfaces/__init__.py b/interfaces/__init__.py new file mode 100644 index 0000000..0d18ebc --- /dev/null +++ b/interfaces/__init__.py @@ -0,0 +1,465 @@ +############################################################################## +# +# Copyright (c) 2001, 2002 Zope Corporation and Contributors. +# All Rights Reserved. +# +# This software is subject to the provisions of the Zope Public License, +# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. +# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED +# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS +# FOR A PARTICULAR PURPOSE. +# +############################################################################## +"""Interfaces for the publisher. + +$Id$ +""" + +import zope.deprecation + +from zope.interface import Interface +from zope.interface import Attribute +from zope.security.interfaces import Unauthorized +from zope.component.interfaces import IPresentationRequest +from zope.interface import implements +from zope.interface.interfaces import IInterface +from zope.interface.common.mapping import IEnumerableMapping +from zope.interface.common.interfaces import IException +from zope.security.interfaces import IParticipation + +# BBB : can be remove in 3.3 +zope.deprecation.__show__.off() +from zope.exceptions import NotFoundError, INotFoundError +zope.deprecation.__show__.on() + +class IPublishingException(IException): + pass + +class PublishingException(Exception): + implements(IPublishingException) + +class ITraversalException(IPublishingException): + pass + +class TraversalException(PublishingException): + implements(ITraversalException) + +class INotFound(INotFoundError, ITraversalException): + def getObject(): + 'Returns the object that was being traversed.' + + def getName(): + 'Returns the name that was being traversed.' + +class NotFound(NotFoundError, TraversalException): + implements(INotFound) + + def __init__(self, ob, name, request=None): + self.ob = ob + self.name = name + + def getObject(self): + return self.ob + + def getName(self): + return self.name + + def __str__(self): + try: + ob = `self.ob` + except: + ob = 'unprintable object' + return 'Object: %s, name: %s' % (ob, `self.name`) + +class IDebugError(ITraversalException): + def getObject(): + 'Returns the object being traversed.' + + def getMessage(): + 'Returns the debug message.' + +class DebugError(TraversalException): + implements(IDebugError) + + def __init__(self, ob, message): + self.ob = ob + self.message = message + + def getObject(self): + return self.ob + + def getMessage(self): + return self.message + + def __str__(self): + return self.message + +class IBadRequest(IPublishingException): + def __str__(): + 'Returns the error message.' + +class BadRequest(PublishingException): + + implements(IBadRequest) + + def __init__(self, message): + self.message = message + + def __str__(self): + return self.message + +class IRedirect(IPublishingException): + def getLocation(): + 'Returns the location.' + +class Redirect(PublishingException): + + implements(IRedirect) + + def __init__(self, location): + self.location = location + + def getLocation(self): + return self.location + + def __str__(self): + return 'Location: %s' % self.location + +class IRetry(IPublishingException): + def getOriginalException(): + 'Returns the original exception object.' + +class Retry(PublishingException): + """Raise this to retry a request.""" + + implements(IRetry) + + def __init__(self, orig_exc=None): + self.orig_exc = orig_exc + + def getOriginalException(self): + return self.orig_exc + + def __str__(self): + return repr(self.orig_exc) + + +class IExceptionSideEffects(Interface): + """An exception caught by the publisher is adapted to this so that + it can have persistent side-effects.""" + + def __call__(obj, request, exc_info): + """Effect persistent side-effects. + + Arguments are: + obj context-wrapped object that was published + request the request + exc_info the exception info being handled + + """ + + +class IPublishTraverse(Interface): + + def publishTraverse(request, name): + """Lookup a name + + The request argument is the publisher request object. + + If a lookup is not possible, raise a NotFound error. + + This method should return an object having the specified name and + `self` as parent. The method can use the request to determine the + correct object. + """ + + +class IPublisher(Interface): + + def publish(request): + """Publish a request + + The request must be an IPublisherRequest. + """ + +class IResponse(Interface): + """Interface used by the publsher""" + + def setResult(result): + """Sets the response result value. + """ + + def handleException(exc_info): + """Handles an otherwise unhandled exception. + + The publication object gets the first chance to handle an exception, + and if it doesn't have a good way to do it, it defers to the + response. Implementations should set the reponse body. + """ + + def internalError(): + """Called when the exception handler bombs. + + Should report back to the client that an internal error occurred. + """ + + def reset(): + """Reset the output result. + + Reset the response by nullifying already set variables. + """ + + def retry(): + """Returns a retry response + + Returns a response suitable for repeating the publication attempt. + """ + + +class IPublication(Interface): + """Object publication framework. + + The responsibility of publication objects is to provide + application hooks for the publishing process. This allows + application-specific tasks, such as connecting to databases, + managing transactions, and setting security contexts to be invoked + during the publishing process. + """ + # The order of the hooks mostly corresponds with the order in which + # they are invoked. + + def beforeTraversal(request): + """Pre-traversal hook. + + This is called *once* before any traversal has been done. + """ + + def getApplication(request): + """Returns the object where traversal should commence. + """ + + def callTraversalHooks(request, ob): + """Invokes any traversal hooks associated with the object. + + This is called before traversing each object. The ob argument + is the object that is about to be traversed. + """ + + def traverseName(request, ob, name): + """Traverses to the next object. + """ + + def afterTraversal(request, ob): + """Post-traversal hook. + + This is called after all traversal. + """ + + def callObject(request, ob): + """Call the object, returning the result. + + For GET/POST this means calling it, but for other methods + (including those of WebDAV and FTP) this might mean invoking + a method of an adapter. + """ + + def afterCall(request, ob): + """Post-callObject hook (if it was successful). + """ + + def handleException(object, request, exc_info, retry_allowed=1): + """Handle an exception + + Either: + - sets the body of the response, request.response, or + - raises a Retry exception, or + - throws another exception, which is a Bad Thing. + + Note that this method should not leak, which means that + exc_info must be set to some other value before exiting the method. + """ + + def endRequest(request, ob): + """Do any end-of-request cleanup + """ + + +class IPublicationRequest(IPresentationRequest, IParticipation): + """Interface provided by requests to IPublication objects + """ + + response = Attribute("""The request's response object + + Return an IPublisherResponse for the request. + """) + + def close(): + """Release resources held by the request. + """ + + def hold(held): + """Hold a reference to an object until the request is closed. + + The object should be an IHeld. If it is an IHeld, it's + release method will be called when it is released. + """ + + def getTraversalStack(): + """Return the request traversal stack + + This is a sequence of steps to traverse in reverse order. They + will be traversed from last to first. + """ + + def setTraversalStack(stack): + """Change the traversal stack. + + See getTraversalStack. + """ + + def getPositionalArguments(): + """Return the positional arguments given to the request. + """ + + def setPrincipal(principal): + """Set the principal attribute. + + It should be IPrincipal wrapped in it's AuthenticationService's context. + """ + +class IHeld(Interface): + """Object to be held and explicitly released by a request + """ + + def release(): + """Release the held object + + This is called by a request that holds the IHeld when the + request is closed + + """ + +class IPublisherRequest(IPublicationRequest): + """Request interface use by the publisher + + The responsibility of requests is to encapsulate protocol + specific details, especially wrt request inputs. + + Request objects also serve as "context" objects, providing + construction of and access to responses and storage of publication + objects. + """ + + def supportsRetry(): + """Check whether the request supports retry + + Return a boolean value indicating whether the request can be retried. + """ + + def retry(): + """Return a retry request + + Return a request suitable for repeating the publication attempt. + """ + + publication = Attribute("""The request's publication object + + The publication object, an IRequestPublication provides + application-specific functionality hooks. + """) + + def setPublication(publication): + """Set the request's publication object + """ + + def traverse(object): + """Traverse from the given object to the published object + + The published object is returned. + + The following hook methods on the publication will be called: + + - callTraversalHooks is called before each step and after + the last step. + + - traverseName to actually do a single traversal + + """ + + def processInputs(): + """Do any input processing that needs to be done before traversing + + This is done after construction to allow the publisher to + handle errors that arise. + """ + + +class IDebugFlags(Interface): + """Features that support debugging.""" + + sourceAnnotations = Attribute("""Enable ZPT source annotations""") + showTAL = Attribute("""Leave TAL markup in rendered page templates""") + + +class IApplicationRequest(IEnumerableMapping): + """Features that support application logic + """ + + principal = Attribute("""Principal object associated with the request + This is a read-only attribute. + """) + + bodyStream = Attribute( + """The stream that provides the data of the request. + + The data returned by the stream will not include any possible header + information, which should have been stripped by the server (or + previous layer) before. + + Also, the body stream might already be read and not return any + data. This is commonly done when retrieving the data for the ``body`` + attribute. + + If you access this stream directly to retrieve data, it will not be + possible by other parts of the framework to access the data of the + request via the ``body`` attribute.""") + + debug = Attribute("""Debug flags (see IDebugFlags).""") + + def __getitem__(key): + """Return request data + + The only request data are environment variables. + """ + + environment = Attribute( + """Request environment data + + This is a read-only mapping from variable name to value. + """) + + annotations = Attribute( + """Stores arbitrary application data under package-unique keys. + + By "package-unique keys", we mean keys that are are unique by + virtue of including the dotted name of a package as a prefex. A + package name is used to limit the authority for picking names for + a package to the people using that package. + + For example, when implementing annotations for hypothetical + request-persistent adapters in a hypothetical zope.persistentadapter + package, the key would be (or at least begin with) the following:: + + "zope.persistentadapter" + """) + + +class IRequest(IPublisherRequest, IPublicationRequest, IApplicationRequest): + """The basic request contract + """ + + +class ILayer(IInterface): + """A grouping of related views for a request.""" + diff --git a/tests/basetestiapplicationrequest.py b/tests/basetestiapplicationrequest.py new file mode 100644 index 0000000..fea56c8 --- /dev/null +++ b/tests/basetestiapplicationrequest.py @@ -0,0 +1,48 @@ +############################################################################## +# +# Copyright (c) 2001, 2002 Zope Corporation and Contributors. +# All Rights Reserved. +# +# This software is subject to the provisions of the Zope Public License, +# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. +# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED +# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS +# FOR A PARTICULAR PURPOSE. +# +############################################################################## +"""IApplicationRequest Base Test + +$Id$ +""" +from zope.interface.verify import verifyObject +from zope.publisher.interfaces import IApplicationRequest + +from zope.interface.common.tests.basemapping import BaseTestIEnumerableMapping + +from zope.interface.common.tests.basemapping import testIReadMapping + + +class BaseTestIApplicationRequest(BaseTestIEnumerableMapping): + def testVerifyIApplicationRequest(self): + verifyObject(IApplicationRequest, self._Test__new()) + + def testHaveCustomTestsForIApplicationRequest(self): + # Make sure that tests are defined for things we can't test here + self.test_IApplicationRequest_bodyStream + + def testEnvironment(self): + request = self._Test__new(foo='Foo', bar='Bar') + + try: + request.environment = {} + except AttributeError: + pass + else: + raise "Shouldn't be able to set environment" + + environment = request.environment + + testIReadMapping(self, environment, + {'foo': 'Foo', 'bar': 'Bar'}, + ['splat']) diff --git a/tests/test_baserequest.py b/tests/test_baserequest.py new file mode 100644 index 0000000..9f7ceff --- /dev/null +++ b/tests/test_baserequest.py @@ -0,0 +1,96 @@ +############################################################################## +# +# Copyright (c) 2001, 2002 Zope Corporation and Contributors. +# All Rights Reserved. +# +# This software is subject to the provisions of the Zope Public License, +# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. +# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED +# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS +# FOR A PARTICULAR PURPOSE. +# +############################################################################## +"""baserequest tests + +$Id$ +""" +from unittest import TestCase, main, makeSuite + +from zope.publisher.tests.basetestipublicationrequest \ + import BaseTestIPublicationRequest + +from zope.publisher.tests.basetestipublisherrequest \ + import BaseTestIPublisherRequest + +from zope.publisher.tests.basetestiapplicationrequest \ + import BaseTestIApplicationRequest + +from StringIO import StringIO + +class TestBaseRequest(BaseTestIPublicationRequest, + BaseTestIApplicationRequest, + BaseTestIPublisherRequest, + TestCase): + + def _Test__new(self, **kw): + from zope.publisher.base import BaseRequest + return BaseRequest(StringIO(''), kw) + + def _Test__expectedViewType(self): + return None # we don't expect + + def test_IApplicationRequest_bodyStream(self): + from zope.publisher.base import BaseRequest + + request = BaseRequest(StringIO('spam'), {}) + self.assertEqual(request.bodyStream.read(), 'spam') + + def test_IPublicationRequest_getPositionalArguments(self): + self.assertEqual(self._Test__new().getPositionalArguments(), ()) + + def test_IPublisherRequest_retry(self): + self.assertEqual(self._Test__new().supportsRetry(), 0) + + def test_IPublisherRequest_traverse(self): + from zope.publisher.tests.publication import TestPublication + request = self._Test__new() + request.setPublication(TestPublication()) + app = request.publication.getApplication(request) + + request.setTraversalStack([]) + self.assertEqual(request.traverse(app).name, '') + self.assertEqual(request._last_obj_traversed, app) + request.setTraversalStack(['ZopeCorp']) + self.assertEqual(request.traverse(app).name, 'ZopeCorp') + self.assertEqual(request._last_obj_traversed, app.ZopeCorp) + request.setTraversalStack(['Engineering', 'ZopeCorp']) + self.assertEqual(request.traverse(app).name, 'Engineering') + self.assertEqual(request._last_obj_traversed, app.ZopeCorp.Engineering) + + def test_IPublisherRequest_processInputs(self): + self._Test__new().processInputs() + + def test_AnnotationsExist(self): + self.assertEqual(self._Test__new().annotations, {}) + + # Needed by BaseTestIEnumerableMapping tests: + def _IEnumerableMapping__stateDict(self): + return {'id': 'ZopeOrg', 'title': 'Zope Community Web Site', + 'greet': 'Welcome to the Zope Community Web site'} + + def _IEnumerableMapping__sample(self): + return self._Test__new(**(self._IEnumerableMapping__stateDict())) + + def _IEnumerableMapping__absentKeys(self): + return 'foo', 'bar' + + def test_SetRequestInResponse(self): + request = self._Test__new() + self.assertEqual(request.response._request, request) + +def test_suite(): + return makeSuite(TestBaseRequest) + +if __name__=='__main__': + main(defaultTest='test_suite') diff --git a/tests/test_http.py b/tests/test_http.py new file mode 100644 index 0000000..3573356 --- /dev/null +++ b/tests/test_http.py @@ -0,0 +1,633 @@ +# -*- coding: latin-1 -*- +############################################################################## +# +# Copyright (c) 2001, 2002 Zope Corporation and Contributors. +# All Rights Reserved. +# +# This software is subject to the provisions of the Zope Public License, +# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. +# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED +# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS +# FOR A PARTICULAR PURPOSE. +# +############################################################################## +"""HTTP Publisher Tests + +$Id$ +""" +import unittest + +from zope.interface import implements +from zope.publisher.interfaces.logginginfo import ILoggingInfo +from zope.publisher.http import HTTPRequest, HTTPResponse +from zope.publisher.http import HTTPInputStream, StrResult +from zope.publisher.publish import publish +from zope.publisher.base import DefaultPublication +from zope.publisher.interfaces.http import IHTTPRequest, IHTTPResponse +from zope.publisher.interfaces.http import IHTTPApplicationResponse +from zope.publisher.interfaces import IResponse + +from zope.i18n.interfaces.locales import ILocale + +from zope.interface.verify import verifyObject + +from StringIO import StringIO +from Cookie import CookieError + + +class UserStub(object): + implements(ILoggingInfo) + + def __init__(self, id): + self._id = id + + def getId(self): + return self._id + + def getLogMessage(self): + return self._id + + +data = '''\ +line 1 +line 2 +line 3''' + + +class HTTPInputStreamTests(unittest.TestCase): + + def setUp(self): + self.stream = HTTPInputStream(StringIO(data)) + + def getCacheStreamValue(self): + self.stream.cacheStream.seek(0) + return self.stream.cacheStream.read() + + def testRead(self): + output = '' + self.assertEqual(output, self.getCacheStreamValue()) + output += self.stream.read(5) + self.assertEqual(output, self.getCacheStreamValue()) + output += self.stream.read() + self.assertEqual(output, self.getCacheStreamValue()) + self.assertEqual(data, self.getCacheStreamValue()) + + def testReadLine(self): + output = self.stream.readline() + self.assertEqual(output, self.getCacheStreamValue()) + output += self.stream.readline() + self.assertEqual(output, self.getCacheStreamValue()) + output += self.stream.readline() + self.assertEqual(output, self.getCacheStreamValue()) + output += self.stream.readline() + self.assertEqual(output, self.getCacheStreamValue()) + self.assertEqual(data, self.getCacheStreamValue()) + + def testReadLines(self): + output = ''.join(self.stream.readlines(4)) + self.assertEqual(output, self.getCacheStreamValue()) + output += ''.join(self.stream.readlines()) + self.assertEqual(output, self.getCacheStreamValue()) + self.assertEqual(data, self.getCacheStreamValue()) + + def testGetChacheStream(self): + self.stream.read(5) + self.assertEqual(data, self.stream.getCacheStream().read()) + + +class HTTPTests(unittest.TestCase): + + _testEnv = { + 'PATH_INFO': '/folder/item', + 'a': '5', + 'b': 6, + 'SERVER_URL': 'http://foobar.com', + 'HTTP_HOST': 'foobar.com', + 'CONTENT_LENGTH': '0', + 'HTTP_AUTHORIZATION': 'Should be in accessible', + 'GATEWAY_INTERFACE': 'TestFooInterface/1.0', + 'HTTP_OFF_THE_WALL': "Spam 'n eggs", + 'HTTP_ACCEPT_CHARSET': 'ISO-8859-1, UTF-8;q=0.66, UTF-16;q=0.33', + } + + def setUp(self): + class AppRoot(object): + """Required docstring for the publisher.""" + + class Folder(object): + """Required docstring for the publisher.""" + + class Item(object): + """Required docstring for the publisher.""" + def __call__(self, a, b): + return "%s, %s" % (`a`, `b`) + + self.app = AppRoot() + self.app.folder = Folder() + self.app.folder.item = Item() + self.app.xxx = Item() + + def _createRequest(self, extra_env={}, body=""): + env = self._testEnv.copy() + env.update(extra_env) + if len(body): + env['CONTENT_LENGTH'] = str(len(body)) + + publication = DefaultPublication(self.app) + instream = StringIO(body) + request = HTTPRequest(instream, env) + request.setPublication(publication) + return request + + def _publisherResults(self, extra_env={}, body=""): + request = self._createRequest(extra_env, body) + response = request.response + publish(request, handle_errors=False) + headers = response.getHeaders() + headers.sort() + return ( + "Status: %s\r\n" % response.getStatusString() + + + "\r\n".join([("%s: %s" % h) for h in headers]) + "\r\n\r\n" + + + ''.join(response.consumeBody()) + ) + + def test_repr(self): + request = self._createRequest() + expect = '<%s.%s instance URL=http://foobar.com>' % ( + request.__class__.__module__, request.__class__.__name__) + self.assertEqual(repr(request), expect) + + def testTraversalToItem(self): + res = self._publisherResults() + self.failUnlessEqual( + res, + "Status: 200 Ok\r\n" + "Content-Length: 6\r\n" + "X-Powered-By: Zope (www.zope.org), Python (www.python.org)\r\n" + "\r\n" + "'5', 6") + + def testRedirect(self): + # test HTTP/1.0 + env = {'SERVER_PROTOCOL':'HTTP/1.0'} + + request = self._createRequest(env, '') + location = request.response.redirect('http://foobar.com/redirected') + self.assertEquals(location, 'http://foobar.com/redirected') + self.assertEquals(request.response.getStatus(), 302) + self.assertEquals(request.response.getHeader('location'), location) + + # test HTTP/1.1 + env = {'SERVER_PROTOCOL':'HTTP/1.1'} + + request = self._createRequest(env, '') + location = request.response.redirect('http://foobar.com/redirected') + self.assertEquals(request.response.getStatus(), 303) + + # test explicit status + request = self._createRequest(env, '') + request.response.redirect('http://foobar.com/explicit', 304) + self.assertEquals(request.response.getStatus(), 304) + + def testRequestEnvironment(self): + req = self._createRequest() + publish(req, handle_errors=0) # Force expansion of URL variables + + self.assertEquals(str(req.URL), 'http://foobar.com/folder/item') + self.assertEquals(req.URL['-1'], 'http://foobar.com/folder') + self.assertEquals(req.URL['-2'], 'http://foobar.com') + self.assertRaises(KeyError, req.URL.__getitem__, '-3') + + self.assertEquals(req.URL['0'], 'http://foobar.com') + self.assertEquals(req.URL['1'], 'http://foobar.com/folder') + self.assertEquals(req.URL['2'], 'http://foobar.com/folder/item') + self.assertRaises(KeyError, req.URL.__getitem__, '3') + + self.assertEquals(req.URL.get('0'), 'http://foobar.com') + self.assertEquals(req.URL.get('1'), 'http://foobar.com/folder') + self.assertEquals(req.URL.get('2'), 'http://foobar.com/folder/item') + self.assertEquals(req.URL.get('3', 'none'), 'none') + + self.assertEquals(req['SERVER_URL'], 'http://foobar.com') + self.assertEquals(req['HTTP_HOST'], 'foobar.com') + self.assertEquals(req['PATH_INFO'], '/folder/item') + self.assertEquals(req['CONTENT_LENGTH'], '0') + self.assertRaises(KeyError, req.__getitem__, 'HTTP_AUTHORIZATION') + self.assertEquals(req['GATEWAY_INTERFACE'], 'TestFooInterface/1.0') + self.assertEquals(req['HTTP_OFF_THE_WALL'], "Spam 'n eggs") + + self.assertRaises(KeyError, req.__getitem__, + 'HTTP_WE_DID_NOT_PROVIDE_THIS') + + def testRequestLocale(self): + eq = self.assertEqual + unless = self.failUnless + + from zope.component import provideAdapter + from zope.publisher.browser import BrowserLanguages + from zope.publisher.interfaces.http import IHTTPRequest + from zope.i18n.interfaces import IUserPreferredLanguages + provideAdapter(BrowserLanguages, [IHTTPRequest], + IUserPreferredLanguages) + + for httplang in ('it', 'it-ch', 'it-CH', 'IT', 'IT-CH', 'IT-ch'): + req = self._createRequest({'HTTP_ACCEPT_LANGUAGE': httplang}) + locale = req.locale + unless(ILocale.providedBy(locale)) + parts = httplang.split('-') + lang = parts.pop(0).lower() + territory = variant = None + if parts: + territory = parts.pop(0).upper() + if parts: + variant = parts.pop(0).upper() + eq(locale.id.language, lang) + eq(locale.id.territory, territory) + eq(locale.id.variant, variant) + # Now test for non-existant locale fallback + req = self._createRequest({'HTTP_ACCEPT_LANGUAGE': 'xx'}) + locale = req.locale + unless(ILocale.providedBy(locale)) + eq(locale.id.language, None) + eq(locale.id.territory, None) + eq(locale.id.variant, None) + + # If the first language is not available we should try others + req = self._createRequest({'HTTP_ACCEPT_LANGUAGE': 'xx,en;q=0.5'}) + locale = req.locale + unless(ILocale.providedBy(locale)) + eq(locale.id.language, 'en') + eq(locale.id.territory, None) + eq(locale.id.variant, None) + + # Regression test: there was a bug where territory and variant were + # not reset + req = self._createRequest({'HTTP_ACCEPT_LANGUAGE': 'xx-YY,en;q=0.5'}) + locale = req.locale + unless(ILocale.providedBy(locale)) + eq(locale.id.language, 'en') + eq(locale.id.territory, None) + eq(locale.id.variant, None) + + from zope.component.testing import tearDown + tearDown() + + def testCookies(self): + cookies = { + 'HTTP_COOKIE': + 'foo=bar; path=/; spam="eggs", this="Should be accepted"' + } + req = self._createRequest(extra_env=cookies) + + self.assertEquals(req.cookies[u'foo'], u'bar') + self.assertEquals(req[u'foo'], u'bar') + + self.assertEquals(req.cookies[u'spam'], u'eggs') + self.assertEquals(req[u'spam'], u'eggs') + + self.assertEquals(req.cookies[u'this'], u'Should be accepted') + self.assertEquals(req[u'this'], u'Should be accepted') + + # Reserved key + self.failIf(req.cookies.has_key('path')) + + def testCookieErrorToLog(self): + cookies = { + 'HTTP_COOKIE': + 'foo=bar; path=/; spam="eggs", ldap/OU="Williams"' + } + req = self._createRequest(extra_env=cookies) + + self.failIf(req.cookies.has_key('foo')) + self.failIf(req.has_key('foo')) + + self.failIf(req.cookies.has_key('spam')) + self.failIf(req.has_key('spam')) + + self.failIf(req.cookies.has_key('ldap/OU')) + self.failIf(req.has_key('ldap/OU')) + + # Reserved key + self.failIf(req.cookies.has_key('path')) + + def testCookiesUnicode(self): + # Cookie values are assumed to be UTF-8 encoded + cookies = {'HTTP_COOKIE': r'key="\342\230\243";'} + req = self._createRequest(extra_env=cookies) + self.assertEquals(req.cookies[u'key'], u'\N{BIOHAZARD SIGN}') + + def testHeaders(self): + headers = { + 'TEST_HEADER': 'test', + 'Another-Test': 'another', + } + req = self._createRequest(extra_env=headers) + self.assertEquals(req.headers[u'TEST_HEADER'], u'test') + self.assertEquals(req.headers[u'TEST-HEADER'], u'test') + self.assertEquals(req.headers[u'test_header'], u'test') + self.assertEquals(req.getHeader('TEST_HEADER', literal=True), u'test') + self.assertEquals(req.getHeader('TEST-HEADER', literal=True), None) + self.assertEquals(req.getHeader('test_header', literal=True), None) + self.assertEquals(req.getHeader('Another-Test', literal=True), + 'another') + + def testBasicAuth(self): + from zope.publisher.interfaces.http import IHTTPCredentials + import base64 + req = self._createRequest() + verifyObject(IHTTPCredentials, req) + lpq = req._authUserPW() + self.assertEquals(lpq, None) + env = {} + login, password = ("tim", "123") + s = base64.encodestring("%s:%s" % (login, password)).rstrip() + env['HTTP_AUTHORIZATION'] = "Basic %s" % s + req = self._createRequest(env) + lpw = req._authUserPW() + self.assertEquals(lpw, (login, password)) + + def testSetPrincipal(self): + req = self._createRequest() + req.setPrincipal(UserStub("jim")) + self.assertEquals(req.response.authUser, 'jim') + + def test_method(self): + r = self._createRequest(extra_env={'REQUEST_METHOD':'SPAM'}) + self.assertEqual(r.method, 'SPAM') + r = self._createRequest(extra_env={'REQUEST_METHOD':'eggs'}) + self.assertEqual(r.method, 'EGGS') + + def test_setApplicationServer(self): + req = self._createRequest() + req.setApplicationServer('foo') + self.assertEquals(req._app_server, 'http://foo') + req.setApplicationServer('foo', proto='https') + self.assertEquals(req._app_server, 'https://foo') + req.setApplicationServer('foo', proto='https', port=8080) + self.assertEquals(req._app_server, 'https://foo:8080') + req.setApplicationServer('foo', proto='http', port='9673') + self.assertEquals(req._app_server, 'http://foo:9673') + req.setApplicationServer('foo', proto='https', port=443) + self.assertEquals(req._app_server, 'https://foo') + req.setApplicationServer('foo', proto='https', port='443') + self.assertEquals(req._app_server, 'https://foo') + req.setApplicationServer('foo', port=80) + self.assertEquals(req._app_server, 'http://foo') + req.setApplicationServer('foo', proto='telnet', port=80) + self.assertEquals(req._app_server, 'telnet://foo:80') + + def test_setApplicationNames(self): + req = self._createRequest() + names = ['x', 'y', 'z'] + req.setVirtualHostRoot(names) + self.assertEquals(req._app_names, ['x', 'y', 'z']) + names[0] = 'muahahahaha' + self.assertEquals(req._app_names, ['x', 'y', 'z']) + + def test_setVirtualHostRoot(self): + req = self._createRequest() + req._traversed_names = ['x', 'y'] + req._last_obj_traversed = object() + req.setVirtualHostRoot() + self.failIf(req._traversed_names) + self.assertEquals(req._vh_root, req._last_obj_traversed) + + def test_getVirtualHostRoot(self): + req = self._createRequest() + self.assertEquals(req.getVirtualHostRoot(), None) + req._vh_root = object() + self.assertEquals(req.getVirtualHostRoot(), req._vh_root) + + def test_traverse(self): + req = self._createRequest() + req.traverse(self.app) + self.assertEquals(req._traversed_names, ['folder', 'item']) + + # setting it during traversal matters + req = self._createRequest() + def hook(self, object, req=req, app=self.app): + if object is app.folder: + req.setVirtualHostRoot() + req.publication.callTraversalHooks = hook + req.traverse(self.app) + self.assertEquals(req._traversed_names, ['item']) + self.assertEquals(req._vh_root, self.app.folder) + + def testInterface(self): + from zope.publisher.interfaces.http import IHTTPCredentials + from zope.publisher.interfaces.http import IHTTPApplicationRequest + rq = self._createRequest() + verifyObject(IHTTPRequest, rq) + verifyObject(IHTTPCredentials, rq) + verifyObject(IHTTPApplicationRequest, rq) + + def testDeduceServerURL(self): + req = self._createRequest() + deduceServerURL = req._HTTPRequest__deduceServerURL + req._environ = {'HTTP_HOST': 'example.com:80'} + self.assertEquals(deduceServerURL(), 'http://example.com') + req._environ = {'HTTP_HOST': 'example.com:8080'} + self.assertEquals(deduceServerURL(), 'http://example.com:8080') + req._environ = {'HTTP_HOST': 'example.com:443', 'HTTPS': 'on'} + self.assertEquals(deduceServerURL(), 'https://example.com') + req._environ = {'HTTP_HOST': 'example.com:80', 'HTTPS': 'ON'} + self.assertEquals(deduceServerURL(), 'https://example.com:80') + req._environ = {'HTTP_HOST': 'example.com:8080', + 'SERVER_PORT_SECURE': '1'} + self.assertEquals(deduceServerURL(), 'https://example.com:8080') + req._environ = {'SERVER_NAME': 'example.com', 'SERVER_PORT':'8080', + 'SERVER_PORT_SECURE': '0'} + self.assertEquals(deduceServerURL(), 'http://example.com:8080') + req._environ = {'SERVER_NAME': 'example.com'} + self.assertEquals(deduceServerURL(), 'http://example.com') + + def testUnicodeURLs(self): + # The request expects PATH_INFO to be utf-8 encoded when it gets it. + req = self._createRequest( + {'PATH_INFO': '/\xc3\xa4\xc3\xb6/\xc3\xbc\xc3\x9f/foo/bar.html'}) + self.assertEqual(req._traversal_stack, + [u'bar.html', u'foo', u'üß', u'äö']) + # the request should have converted PATH_INFO to unicode + self.assertEqual(req['PATH_INFO'], u'/äö/üß/foo/bar.html') + + +class ConcreteHTTPTests(HTTPTests): + """Tests that we don't have to worry about subclasses inheriting and + breaking. + """ + + def test_shiftNameToApplication(self): + r = self._createRequest() + publish(r, handle_errors=0) + appurl = r.getApplicationURL() + + # Verify that we can shift. It would be a little more realistic + # if we could test this during traversal, but the api doesn't + # let us do that. + r = self._createRequest(extra_env={"PATH_INFO": "/xxx"}) + publish(r, handle_errors=0) + r.shiftNameToApplication() + self.assertEquals(r.getApplicationURL(), appurl+"/xxx") + + # Verify that we can only shift if we've traversed only a single name + r = self._createRequest(extra_env={"PATH_INFO": "/folder/item"}) + publish(r, handle_errors=0) + self.assertRaises(ValueError, r.shiftNameToApplication) + + + +class TestHTTPResponse(unittest.TestCase): + + def testInterface(self): + rp = HTTPResponse() + verifyObject(IHTTPResponse, rp) + verifyObject(IHTTPApplicationResponse, rp) + verifyObject(IResponse, rp) + + def _createResponse(self): + response = HTTPResponse() + return response + + def _parseResult(self, response): + return dict(response.getHeaders()), ''.join(response.consumeBody()) + + def _getResultFromResponse(self, body, charset='utf-8', headers=None): + response = self._createResponse() + assert(charset == 'utf-8') + if headers is not None: + for hdr, val in headers.iteritems(): + response.setHeader(hdr, val) + response.setResult(body) + return self._parseResult(response) + + def testWrite_noContentLength(self): + response = self._createResponse() + # We have to set all the headers ourself, we choose not to provide a + # content-length header + response.setHeader('Content-Type', 'text/plain;charset=us-ascii') + + # Output the data + data = 'a'*10 + response.setResult(StrResult(data)) + + headers, body = self._parseResult(response) + # Check that the data have been written, and that the header + # has been preserved + self.assertEqual(headers['Content-Type'], 'text/plain;charset=us-ascii') + self.assertEqual(body, data) + + # Make sure that no Content-Length header was added + self.assert_('Content-Length' not in headers) + + def testContentLength(self): + eq = self.failUnlessEqual + + headers, body = self._getResultFromResponse("test", "utf-8", + {"content-type": "text/plain"}) + eq("4", headers["Content-Length"]) + eq("test", body) + + headers, body = self._getResultFromResponse( + u'\u0442\u0435\u0441\u0442', "utf-8", + {"content-type": "text/plain"}) + eq("8", headers["Content-Length"]) + eq('\xd1\x82\xd0\xb5\xd1\x81\xd1\x82', body) + + def testContentType(self): + eq = self.failUnlessEqual + + headers, body = self._getResultFromResponse("test", "utf-8") + eq("", headers.get("Content-Type", "")) + eq("test", body) + + headers, body = self._getResultFromResponse(u"test", + headers={"content-type": "text/plain"}) + eq("text/plain;charset=utf-8", headers["Content-Type"]) + eq("test", body) + + headers, body = self._getResultFromResponse(u"test", "utf-8", + {"content-type": "text/html"}) + eq("text/html;charset=utf-8", headers["Content-Type"]) + eq("test", body) + + headers, body = self._getResultFromResponse(u"test", "utf-8", + {"content-type": "text/plain;charset=cp1251"}) + eq("text/plain;charset=cp1251", headers["Content-Type"]) + eq("test", body) + + headers, body = self._getResultFromResponse("test", "utf-8", + {"content-type": "image/gif"}) + eq("image/gif", headers["Content-Type"]) + eq("test", body) + + def _getCookieFromResponse(self, cookies): + # Shove the cookies through request, parse the Set-Cookie header + # and spit out a list of headers for examination + response = self._createResponse() + for name, value, kw in cookies: + response.setCookie(name, value, **kw) + response.setResult('test') + return [header[1] + for header in response.getHeaders() + if header[0] == "Set-Cookie"] + + def testSetCookie(self): + c = self._getCookieFromResponse([ + ('foo', 'bar', {}), + ]) + self.failUnless('foo=bar;' in c, 'foo=bar; not in %r' % c) + + c = self._getCookieFromResponse([ + ('foo', 'bar', {}), + ('alpha', 'beta', {}), + ]) + self.failUnless('foo=bar;' in c) + self.failUnless('alpha=beta;' in c) + + c = self._getCookieFromResponse([ + ('sign', u'\N{BIOHAZARD SIGN}', {}), + ]) + self.failUnless(r'sign="\342\230\243";' in c) + + self.assertRaises( + CookieError, + self._getCookieFromResponse, + [('path', 'invalid key', {}),] + ) + + c = self._getCookieFromResponse([ + ('foo', 'bar', { + 'Expires': 'Sat, 12 Jul 2014 23:26:28 GMT', + 'domain': 'example.com', + 'pAth': '/froboz', + 'max_age': 3600, + 'comment': u'blah;\N{BIOHAZARD SIGN}?', + 'seCure': True, + }), + ])[0] + self.failUnless('foo=bar;' in c) + self.failUnless('expires=Sat, 12 Jul 2014 23:26:28 GMT;' in c, repr(c)) + self.failUnless('Domain=example.com;' in c) + self.failUnless('Path=/froboz;' in c) + self.failUnless('Max-Age=3600;' in c) + self.failUnless('Comment=blah%3B%E2%98%A3?;' in c, repr(c)) + self.failUnless('secure;' in c) + + c = self._getCookieFromResponse([('foo', 'bar', {'secure': False})])[0] + self.failUnless('foo=bar;' in c) + self.failIf('secure' in c) + + +def test_suite(): + suite = unittest.TestSuite() + suite.addTest(unittest.makeSuite(ConcreteHTTPTests)) + suite.addTest(unittest.makeSuite(TestHTTPResponse)) + suite.addTest(unittest.makeSuite(HTTPInputStreamTests)) + return suite + + +if __name__ == '__main__': + unittest.main() |