diff options
author | Marius Gedminas <marius@gedmin.as> | 2004-05-12 20:00:38 +0000 |
---|---|---|
committer | Marius Gedminas <marius@gedmin.as> | 2004-05-12 20:00:38 +0000 |
commit | 9a4cf5a13af8a9e36c36fff589d919c6e55aa277 (patch) | |
tree | d0dee5f999bde1eda337bac2ac105818aba83148 | |
download | zope-publisher-monolithic-zope3-mgedmin-security.tar.gz |
Made BaseRequest an IParticipation and replaced request.user withmonolithic-zope3-mgedmin-security
request.principal everywhere.
-rw-r--r-- | base.py | 502 | ||||
-rw-r--r-- | browser.py | 786 | ||||
-rw-r--r-- | http.py | 1029 | ||||
-rw-r--r-- | interfaces/__init__.py | 416 | ||||
-rw-r--r-- | tests/test_http.py | 485 |
5 files changed, 3218 insertions, 0 deletions
@@ -0,0 +1,502 @@ +############################################################################## +# +# Copyright (c) 2001, 2002 Zope Corporation and Contributors. +# All Rights Reserved. +# +# This software is subject to the provisions of the Zope Public License, +# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution. +# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED +# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS +# FOR A PARTICULAR PURPOSE. +# +############################################################################## +"""Base implementations of the Publisher objects + +Specifically, 'BaseRequest', 'BaseResponse', and 'DefaultPublication' are +specified here. + +$Id: base.py,v 1.19 2004/05/06 10:12:13 philikon Exp $ +""" +import traceback +from cStringIO import StringIO + +from zope.interface import implements +from zope.interface.common.mapping import IReadMapping, IEnumerableMapping +from zope.exceptions import NotFoundError + +from zope.publisher.interfaces import IPublication +from zope.publisher.interfaces import NotFound, DebugError, Unauthorized +from zope.publisher.interfaces import IRequest, IResponse +from zope.publisher.publish import mapply + +_marker = object() + +class BaseResponse(object): + """Base Response Class + """ + + __slots__ = ( + '_body', # The response body + '_outstream', # The output stream + '_request', # The associated request (if any) + ) + + implements(IResponse) + + + def __init__(self, outstream): + self._body = '' + self._outstream = outstream + + def outputBody(self): + 'See IPublisherResponse' + self._outstream.write(self._getBody()) + + def setBody(self, body): + 'See IPublisherResponse' + self._body = body + + # This method is not part of this interface + def _getBody(self): + 'Returns a string representing the currently set body.' + return self._body + + def reset(self): + 'See IPublisherResponse' + self._body = "" + + def handleException(self, exc_info): + 'See IPublisherResponse' + traceback.print_exception( + exc_info[0], exc_info[1], exc_info[2], 100, self) + + def internalError(self): + 'See IPublisherResponse' + pass + + def retry(self): + 'See IPublisherResponse' + return self.__class__(self.outstream) + + def write(self, string): + 'See IApplicationResponse' + self._body += string + +class RequestDataGetter(object): + + implements(IReadMapping) + + def __init__(self, request): + self.__get = getattr(request, self._gettrname) + + def __getitem__(self, name): + return self.__get(name) + + def get(self, name, default=None): + return self.__get(name, default) + + def __contains__(self, key): + lookup = self.get(key, self) + return lookup is not self + + has_key = __contains__ + +class RequestDataMapper(object): + + implements(IEnumerableMapping) + + def __init__(self, request): + self.__map = getattr(request, self._mapname) + + def __getitem__(self, name): + return self.__map[name] + + def get(self, name, default=None): + return self.__map.get(name, default) + + def __contains__(self, key): + lookup = self.get(key, self) + return lookup is not self + + has_key = __contains__ + + def keys(self): + return self.__map.keys() + + def __iter__(self): + return iter(self.keys()) + + def items(self): + return self.__map.items() + + def values(self): + return self.__map.values() + + def __len__(self): + return len(self.__map) + +class RequestDataProperty(object): + + def __init__(self, gettr_class): + self.__gettr_class = gettr_class + + def __get__(self, request, rclass=None): + if request is not None: + return self.__gettr_class(request) + + def __set__(*args): + raise AttributeError, 'Unassignable attribute' + + +class RequestEnvironment(RequestDataMapper): + _mapname = '_environ' + +class BaseRequest(object): + """Represents a publishing request. + + This object provides access to request data. Request data may + vary depending on the protocol used. + + Request objects are created by the object publisher and will be + passed to published objects through the argument name, REQUEST. + + The request object is a mapping object that represents a + collection of variable to value mappings. + """ + + implements(IRequest) + + __slots__ = ( + '_held', # Objects held until the request is closed + '_traversed_names', # The names that have been traversed + '_last_obj_traversed', # Object that was traversed last + '_traversal_stack', # Names to be traversed, in reverse order + '_environ', # The request environment variables + '_response', # The response + '_args', # positional arguments + '_body_instream', # input stream + '_body', # The request body as a string + '_publication', # publication object + '_presentation_skin', # View skin + '_principal', # request principal, set by publication + 'interaction', # interaction, set by interaction + ) + + environment = RequestDataProperty(RequestEnvironment) + + def __init__(self, body_instream, outstream, environ, response=None, + positional=()): + self._traversal_stack = [] + self._last_obj_traversed = None + self._traversed_names = [] + self._environ = environ + + self._args = positional + if response is None: + self._response = self._createResponse(outstream) + else: + self._response = response + self._response._request = self + + self._body_instream = body_instream + self._held = () + self._principal = None + self.interaction = None + + def setPrincipal(self, principal): + self._principal = principal + + principal = property(lambda self: self._principal) + + def _getPublication(self): + 'See IPublisherRequest' + return getattr(self, '_publication', None) + + publication = property(_getPublication) + + + def processInputs(self): + 'See IPublisherRequest' + # Nothing to do here + + def retry(self): + 'See IPublisherRequest' + raise TypeError('Retry is not supported') + + def setPublication(self, pub): + 'See IPublisherRequest' + self._publication = pub + + def supportsRetry(self): + 'See IPublisherRequest' + return 0 + + def traverse(self, object): + 'See IPublisherRequest' + + publication = self.publication + + traversal_stack = self._traversal_stack + traversed_names = self._traversed_names + + self._last_obj_traversed = object + + prev_object = None + while True: + + if object is not prev_object: + # Invoke hooks (but not more than once). + publication.callTraversalHooks(self, object) + + prev_object = object + + if traversal_stack: + # Traverse to the next step. + entry_name = traversal_stack.pop() + traversed_names.append(entry_name) + subobject = publication.traverseName( + self, object, entry_name) + self._last_obj_traversed = object = subobject + else: + # Finished traversal. + break + + return object + + def close(self): + 'See IPublicationRequest' + self._held = None + self._response = None + self._body_instream = None + self._publication = None + + def getPositionalArguments(self): + 'See IPublicationRequest' + return self._args + + def _getResponse(self): + return self._response + + response = property(_getResponse) + + def getTraversalStack(self): + 'See IPublicationRequest' + return list(self._traversal_stack) # Return a copy + + def hold(self, object): + 'See IPublicationRequest' + self._held = self._held + (object,) + + def setTraversalStack(self, stack): + 'See IPublicationRequest' + self._traversal_stack[:] = list(stack) + + def setPresentationSkin(self, skin): + 'See IPublicationRequest' + self._presentation_skin = skin + + def getPresentationSkin(self): + 'See IPresentationRequest' + return getattr(self, '_presentation_skin', '') + + def _getBody(self): + body = getattr(self, '_body', None) + if body is None: + s = self._body_instream + if s is None: + return None # XXX what should be returned here? + p = s.tell() + s.seek(0) + body = s.read() + s.seek(p) + self._body = body + return body + + body = property(_getBody) + + def _getBodyFile(self): + 'See IApplicationRequest' + return self._body_instream + + bodyFile = property(_getBodyFile) + + def __len__(self): + 'See Interface.Common.Mapping.IEnumerableMapping' + return len(self.keys()) + + def items(self): + 'See Interface.Common.Mapping.IEnumerableMapping' + result = [] + get = self.get + for k in self.keys(): + result.append((k, get(k))) + return result + + def keys(self): + 'See Interface.Common.Mapping.IEnumerableMapping' + return self._environ.keys() + + def __iter__(self): + return iter(self.keys()) + + def values(self): + 'See Interface.Common.Mapping.IEnumerableMapping' + result = [] + get = self.get + for k in self.keys(): + result.append(get(k)) + return result + + def __getitem__(self, key): + 'See Interface.Common.Mapping.IReadMapping' + result = self.get(key, _marker) + if result is _marker: + raise KeyError, key + else: + return result + + def get(self, key, default=None): + 'See Interface.Common.Mapping.IReadMapping' + + result = self._environ.get(key, self) + if result is not self: return result + + return default + + def __contains__(self, key): + 'See Interface.Common.Mapping.IReadMapping' + lookup = self.get(key, self) + return lookup is not self + + has_key = __contains__ + + def _createResponse(self, outstream): + # Should be overridden by subclasses + return BaseResponse(outstream) + + def __nonzero__(self): + # This is here to avoid calling __len__ for boolean tests + return 1 + + def __str__(self): + L1 = self.items() + L1.sort() + return "\n".join(map(lambda item: "%s:\t%s" % item, L1)) + + def _setupPath_helper(self, attr): + path = self.get(attr, "/").strip() + if path.endswith('/'): + # Remove trailing backslash, so that we will not get an empty + # last entry when splitting the path. + path = path[:-1] + self._endswithslash = True + else: + self._endswithslash = False + + clean = [] + for item in path.split('/'): + if not item or item == '.': + continue + elif item == '..': + try: + del clean[-1] + except IndexError: + raise NotFoundError('..') + else: clean.append(item) + + clean.reverse() + self.setTraversalStack(clean) + + self._path_suffix = None + +class TestRequest(BaseRequest): + + __slots__ = ('_presentation_type', ) + + def __init__(self, path, body_instream=None, outstream=None, environ=None): + if environ is None: + environ = {} + environ['PATH_INFO'] = path + if body_instream is None: + body_instream = StringIO('') + if outstream is None: + outstream = StringIO() + + super(TestRequest, self).__init__(body_instream, outstream, environ) + + +class DefaultPublication(object): + """A stub publication. + + This works just like Zope2's ZPublisher. It rejects any name + starting with an underscore and any objects (specifically: method) + that doesn't have a docstring. + """ + implements(IPublication) + + require_docstrings = True + + def __init__(self, app): + self.app = app + + def beforeTraversal(self, request): + # Lop off leading and trailing empty names + stack = request.getTraversalStack() + while stack and not stack[-1]: + stack.pop() # toss a trailing empty name + while stack and not stack[0]: + stack.pop(0) # toss a leading empty name + request.setTraversalStack(stack) + + def getApplication(self, request): + return self.app + + def callTraversalHooks(self, request, ob): + pass + + def traverseName(self, request, ob, name, check_auth=1): + if name.startswith('_'): + raise Unauthorized("Name %s begins with an underscore" % `name`) + if hasattr(ob, name): + subob = getattr(ob, name) + else: + try: + subob = ob[name] + except (KeyError, IndexError, + TypeError, AttributeError): + raise NotFound(ob, name, request) + if self.require_docstrings and not getattr(subob, '__doc__', None): + raise DebugError(subob, 'Missing or empty doc string') + return subob + + def getDefaultTraversal(self, request, ob): + return ob, () + + def afterTraversal(self, request, ob): + pass + + def callObject(self, request, ob): + return mapply(ob, request.getPositionalArguments(), request) + + def afterCall(self, request, ob): + pass + + def handleException(self, object, request, exc_info, retry_allowed=1): + # Let the response handle it as best it can. + request.response.reset() + request.response.handleException(exc_info) + + +class TestPublication(DefaultPublication): + + def traverseName(self, request, ob, name, check_auth=1): + if hasattr(ob, name): + subob = getattr(ob, name) + else: + try: + subob = ob[name] + except (KeyError, IndexError, + TypeError, AttributeError): + raise NotFound(ob, name, request) + return subob diff --git a/browser.py b/browser.py new file mode 100644 index 0000000..3cde729 --- /dev/null +++ b/browser.py @@ -0,0 +1,786 @@ +############################################################################## +# +# Copyright (c) 2001, 2002 Zope Corporation and Contributors. +# All Rights Reserved. +# +# This software is subject to the provisions of the Zope Public License, +# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution. +# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED +# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS +# FOR A PARTICULAR PURPOSE. +# +############################################################################## +"""Browser-specific Publisher classes + +Here we define the specific 'BrowserRequest' and 'BrowserResponse' class. The +big improvement of the 'BrowserRequest' to 'HTTPRequest' is that is can handle +HTML form data and convert them into a Python-native format. Even file data is +packaged into a nice, Python-friendly 'FileUpload' object. + +$Id: browser.py,v 1.31 2004/05/06 10:12:13 philikon Exp $ +""" +import re +from types import ListType, TupleType, StringType, StringTypes +from cgi import FieldStorage, escape + +from zope.interface import implements +from zope.i18n.interfaces import IUserPreferredLanguages +from zope.i18n.interfaces import IUserPreferredCharsets +from zope.publisher.interfaces.browser import IBrowserRequest +from zope.publisher.interfaces.browser import IBrowserApplicationRequest + +from zope.publisher.http import HTTPRequest, HTTPResponse +from zope.publisher.base import BaseRequest + +__metaclass__ = type # All classes are new style when run with Python 2.2+ + +__ArrayTypes = (ListType, TupleType) + +start_of_header_search=re.compile('(<head[^>]*>)', re.I).search +base_re_search=re.compile('(<base.*?>)',re.I).search +isRelative = re.compile("[-_.!~*a-zA-z0-9'()@&=+$,]+(/|$)").match +newlines = re.compile('\r\n|\n\r|\r') + + +def is_text_html(content_type): + return content_type.startswith('text/html') + +# Flag Constants +SEQUENCE = 1 +DEFAULT = 2 +RECORD = 4 +RECORDS = 8 +REC = RECORD | RECORDS +CONVERTED = 32 +DEFAULTABLE_METHODS = 'GET', 'POST', 'HEAD' + + +def field2string(v): + if hasattr(v, 'read'): + return v.read() + return str(v) + +def field2text(v, nl=newlines): + return nl.sub("\n", field2string(v)) + +def field2required(v): + v = field2string(v) + if not v.strip(): + raise ValueError, 'No input for required field<p>' + return v + +def field2int(v): + if isinstance(v, __ArrayTypes): + return map(field2int, v) + v = field2string(v) + if not v: + raise ValueError, 'Empty entry when <strong>integer</strong> expected' + try: + return int(v) + except ValueError: + raise ValueError, "An integer was expected in the value '%s'" % v + +def field2float(v): + if isinstance(v, __ArrayTypes): + return map(field2float, v) + v = field2string(v) + if not v: + raise ValueError, ( + 'Empty entry when <strong>floating-point number</strong> expected') + try: + return float(v) + except ValueError: + raise ValueError, ( + "A floating-point number was expected in the value '%s'" % v + ) + +def field2long(v): + if isinstance(v, __ArrayTypes): + return map(field2long, v) + v = field2string(v) + + # handle trailing 'L' if present. + if v and v[-1].upper() == 'L': + v = v[:-1] + if not v: + raise ValueError, 'Empty entry when <strong>integer</strong> expected' + try: + return long(v) + except ValueError: + raise ValueError, "A long integer was expected in the value '%s'" % v + +def field2tokens(v): + return field2string(v).split() + +def field2lines(v): + if isinstance(v, __ArrayTypes): + return [str(item) for item in v] + return field2text(v).splitlines() + +def field2boolean(v): + return bool(v) + +type_converters = { + 'float': field2float, + 'int': field2int, + 'long': field2long, + 'string': field2string, + 'required': field2required, + 'tokens': field2tokens, + 'lines': field2lines, + 'text': field2text, + 'boolean': field2boolean, + } + +get_converter = type_converters.get + +def registerTypeConverter(field_type, converter, replace=False): + """Add a custom type converter to the registry. + + o If 'replace' is not true, raise a KeyError if a converter is + already registered for 'field_type'. + """ + existing = type_converters.get(field_type) + + if existing is not None and not replace: + raise KeyError, 'Existing converter for field_type: %s' % field_type + + type_converters[field_type] = converter + + +isCGI_NAME = { + # These fields are placed in request.environ instead of request.form. + 'SERVER_SOFTWARE' : 1, + 'SERVER_NAME' : 1, + 'GATEWAY_INTERFACE' : 1, + 'SERVER_PROTOCOL' : 1, + 'SERVER_PORT' : 1, + 'REQUEST_METHOD' : 1, + 'PATH_INFO' : 1, + 'PATH_TRANSLATED' : 1, + 'SCRIPT_NAME' : 1, + 'QUERY_STRING' : 1, + 'REMOTE_HOST' : 1, + 'REMOTE_ADDR' : 1, + 'AUTH_TYPE' : 1, + 'REMOTE_USER' : 1, + 'REMOTE_IDENT' : 1, + 'CONTENT_TYPE' : 1, + 'CONTENT_LENGTH' : 1, + 'SERVER_URL': 1, + }.has_key + +hide_key={ + 'HTTP_AUTHORIZATION':1, + 'HTTP_CGI_AUTHORIZATION': 1, + }.has_key + + +class Record: + + def __getattr__(self, key, default=None): + if key in ('get', 'keys', 'items', 'values', 'copy', + 'has_key', '__contains__'): + return getattr(self.__dict__, key) + raise AttributeError, key + + def __getitem__(self, key): + return self.__dict__[key] + + def __str__(self): + L1 = self.__dict__.items() + L1.sort() + return ", ".join(["%s: %s" % item for item in L1]) + + def __repr__(self): + L1 = self.__dict__.items() + L1.sort() + return ', '.join(["%s: %s" % (key, repr(value)) for key, value in L1]) + + +class BrowserRequest(HTTPRequest): + implements(IBrowserRequest, IBrowserApplicationRequest) + + __slots__ = ( + 'form', # Form data + 'charsets', # helper attribute + '__meth', + '__tuple_items', + '__defaults', + ) + + # Set this to True in a subclass to redirect GET requests when the + # effective and actual URLs differ. + use_redirect = False + + def __init__(self, body_instream, outstream, environ, response=None): + self.form = {} + self.charsets = None + super(BrowserRequest, self).__init__( + body_instream, outstream, environ, response) + + + def _createResponse(self, outstream): + # Should be overridden by subclasses + return BrowserResponse(outstream) + + def _decode(self, text): + """Try to decode the text using one of the available charsets.""" + if self.charsets is None: + envadapter = IUserPreferredCharsets(self) + self.charsets = envadapter.getPreferredCharsets() + for charset in self.charsets: + try: + text = unicode(text, charset) + break + except UnicodeError: + pass + return text + + def processInputs(self): + 'See IPublisherRequest' + + if self.method != 'GET': + # Process self.form if not a GET request. + fp = self._body_instream + else: + fp = None + + fs = FieldStorage(fp=fp, environ=self._environ, keep_blank_values=1) + + fslist = getattr(fs, 'list', None) + if fslist is not None: + self.__meth = None + self.__tuple_items = {} + self.__defaults = {} + + # process all entries in the field storage (form) + for item in fslist: + self.__processItem(item) + + if self.__defaults: + self.__insertDefaults() + + if self.__tuple_items: + self.__convertToTuples() + + if self.__meth: + self.setPathSuffix((self.__meth,)) + + _typeFormat = re.compile('([a-zA-Z][a-zA-Z0-9_]+|\\.[xy])$') + + def __processItem(self, item): + """Process item in the field storage.""" + + # Check whether this field is a file upload object + # Note: A field exists for files, even if no filename was + # passed in and no data was uploaded. Therefore we can only + # tell by the empty filename that no upload was made. + key = item.name + if (hasattr(item, 'file') and hasattr(item, 'filename') + and hasattr(item,'headers')): + if (item.file and + (item.filename is not None and item.filename != '' + # RFC 1867 says that all fields get a content-type. + # or 'content-type' in map(lower, item.headers.keys()) + )): + item = FileUpload(item) + else: + item = item.value + + flags = 0 + converter = None + + # Loop through the different types and set + # the appropriate flags + # Syntax: var_name:type_name + + # We'll search from the back to the front. + # We'll do the search in two steps. First, we'll + # do a string search, and then we'll check it with + # a re search. + + while key: + pos = key.rfind(":") + if pos < 0: + break + match = self._typeFormat.match(key, pos + 1) + if match is None: + break + + key, type_name = key[:pos], key[pos + 1:] + + # find the right type converter + c = get_converter(type_name, None) + + if c is not None: + converter = c + flags |= CONVERTED + elif type_name == 'list': + flags |= SEQUENCE + elif type_name == 'tuple': + self.__tuple_items[key] = 1 + flags |= SEQUENCE + elif (type_name == 'method' or type_name == 'action'): + if key: + self.__meth = key + else: + self.__meth = item + elif (type_name == 'default_method' + or type_name == 'default_action') and not self.__meth: + if key: + self.__meth = key + else: + self.__meth = item + elif type_name == 'default': + flags |= DEFAULT + elif type_name == 'record': + flags |= RECORD + elif type_name == 'records': + flags |= RECORDS + elif type_name == 'ignore_empty' and not item: + # skip over empty fields + return + + # Filter out special names from form: + if not (isCGI_NAME(key) or key.startswith('HTTP_')): + # Make it unicode + key = self._decode(key) + if type(item) == StringType: + item = self._decode(item) + + if flags: + self.__setItemWithType(key, item, flags, converter) + else: + self.__setItemWithoutType(key, item) + + def __setItemWithoutType(self, key, item): + """Set item value without explicit type.""" + form = self.form + if key not in form: + form[key] = item + else: + found = form[key] + if isinstance(found, list): + found.append(item) + else: + form[key] = [found, item] + + def __setItemWithType(self, key, item, flags, converter): + """Set item value with explicit type.""" + #Split the key and its attribute + if flags & REC: + key, attr = self.__splitKey(key) + + # defer conversion + if flags & CONVERTED: + try: + item = converter(item) + except: + if item or flags & DEFAULT or key not in self.__defaults: + raise + item = self.__defaults[key] + if flags & RECORD: + item = getattr(item, attr) + elif flags & RECORDS: + item = getattr(item[-1], attr) + + # Determine which dictionary to use + if flags & DEFAULT: + form = self.__defaults + else: + form = self.form + + # Insert in dictionary + if key not in form: + if flags & SEQUENCE: + item = [item] + if flags & RECORD: + r = form[key] = Record() + setattr(r, attr, item) + elif flags & RECORDS: + r = Record() + setattr(r, attr, item) + form[key] = [r] + else: + form[key] = item + else: + r = form[key] + if flags & RECORD: + if not flags & SEQUENCE: + setattr(r, attr, item) + else: + if not hasattr(r, attr): + setattr(r, attr, [item]) + else: + getattr(r, attr).append(item) + elif flags & RECORDS: + last = r[-1] + if not hasattr(last, attr): + if flags & SEQUENCE: + item = [item] + setattr(last, attr, item) + else: + if flags & SEQUENCE: + getattr(last, attr).append(item) + else: + new = Record() + setattr(new, attr, item) + r.append(new) + else: + if isinstance(r, list): + r.append(item) + else: + form[key] = [r, item] + + def __splitKey(self, key): + """Split the key and its attribute.""" + i = key.rfind(".") + if i >= 0: + return key[:i], key[i + 1:] + return key, "" + + def __convertToTuples(self): + """Convert form values to tuples.""" + form = self.form + + for key in self.__tuple_items: + if key in form: + form[key] = tuple(form[key]) + else: + k, attr = self.__splitKey(key) + + # remove any type_names in the attr + i = attr.find(":") + if i >= 0: + attr = attr[:i] + + if k in form: + item = form[k] + if isinstance(item, Record): + if hasattr(item, attr): + setattr(item, attr, tuple(getattr(item, attr))) + else: + for v in item: + if hasattr(v, attr): + setattr(v, attr, tuple(getattr(v, attr))) + + def __insertDefaults(self): + """Insert defaults into form dictionary.""" + form = self.form + + for keys, values in self.__defaults.iteritems(): + if not keys in form: + form[keys] = values + else: + item = form[keys] + if isinstance(values, Record): + for k, v in values.items(): + if not hasattr(item, k): + setattr(item, k, v) + elif isinstance(values, list): + for val in values: + if isinstance(val, Record): + for k, v in val.items(): + for r in item: + if not hasattr(r, k): + setattr(r, k, v) + elif not val in item: + item.append(val) + + def traverse(self, object): + 'See IPublisherRequest' + + ob = super(BrowserRequest, self).traverse(object) + method = self.method + + base_needed = 0 + if self._path_suffix: + # We had a :method variable, so we need to set the base, + # but we don't look for default documents any more. + base_needed = 1 + redirect = 0 + elif method in DEFAULTABLE_METHODS: + # We need to check for default documents + publication = self.publication + + nsteps = 0 + ob, add_steps = publication.getDefaultTraversal(self, ob) + while add_steps: + nsteps += len(add_steps) + add_steps = list(add_steps) + add_steps.reverse() + self.setTraversalStack(add_steps) + ob = super(BrowserRequest, self).traverse(ob) + ob, add_steps = publication.getDefaultTraversal(self, ob) + + if nsteps > self._endswithslash: + base_needed = 1 + redirect = self.use_redirect and method == 'GET' + + + if base_needed: + url = self.getURL() + response = self.response + if redirect: + response.redirect(url) + return '' + elif not response.getBase(): + response.setBase(url) + + return ob + + def keys(self): + 'See Interface.Common.Mapping.IEnumerableMapping' + d = {} + d.update(self._environ) + d.update(self._cookies) + d.update(self.form) + return d.keys() + + + def get(self, key, default=None): + 'See Interface.Common.Mapping.IReadMapping' + + result = self.form.get(key, self) + if result is not self: return result + + result = self._cookies.get(key, self) + if result is not self: return result + + result = self._environ.get(key, self) + if result is not self: return result + + return default + + +class FileUpload(object): + '''File upload objects + + File upload objects are used to represent file-uploaded data. + + File upload objects can be used just like files. + + In addition, they have a 'headers' attribute that is a dictionary + containing the file-upload headers, and a 'filename' attribute + containing the name of the uploaded file. + ''' + + def __init__(self, aFieldStorage): + + file = aFieldStorage.file + if hasattr(file, '__methods__'): + methods = file.__methods__ + else: + methods = ['close', 'fileno', 'flush', 'isatty', + 'read', 'readline', 'readlines', 'seek', + 'tell', 'truncate', 'write', 'writelines'] + + d = self.__dict__ + for m in methods: + if hasattr(file,m): + d[m] = getattr(file,m) + + self.headers = aFieldStorage.headers + self.filename = aFieldStorage.filename + +class RedirectingBrowserRequest(BrowserRequest): + """Browser requests that redirect when the actual and effective URLs differ + """ + + use_redirect = True + +class TestRequest(BrowserRequest): + """Browser request with a constructor convenient for testing + """ + + def __init__(self, + body_instream=None, outstream=None, environ=None, form=None, + skin='default', + **kw): + + _testEnv = { + 'SERVER_URL': 'http://127.0.0.1', + 'HTTP_HOST': '127.0.0.1', + 'CONTENT_LENGTH': '0', + 'GATEWAY_INTERFACE': 'TestFooInterface/1.0', + } + + if environ: + _testEnv.update(environ) + if kw: + _testEnv.update(kw) + if body_instream is None: + from StringIO import StringIO + body_instream = StringIO('') + + if outstream is None: + from StringIO import StringIO + outstream = StringIO() + + super(TestRequest, self).__init__(body_instream, outstream, _testEnv) + if form: + self.form.update(form) + + self.setPresentationSkin(skin) + + def setPrincipal(self, principal): + # HTTPRequest needs to notify the HTTPTask of the username. + # We don't want to have to stub HTTPTask in the tests. + BaseRequest.setPrincipal(self, principal) + +class BrowserResponse(HTTPResponse): + """Browser response + """ + + __slots__ = ( + '_base', # The base href + ) + + def setBody(self, body): + """Sets the body of the response + + Sets the return body equal to the (string) argument "body". Also + updates the "content-length" return header and sets the status to + 200 if it has not already been set. + """ + if not isinstance(body, StringTypes): + body = unicode(body) + + if 'content-type' not in self._headers: + c = (self.__isHTML(body) and 'text/html' or 'text/plain') + if self._charset is not None: + c += ';charset=' + self._charset + self.setHeader('content-type', c) + + body = self.__insertBase(body) + self._body = body + self._updateContentLength() + if not self._status_set: + self.setStatus(200) + + def __isHTML(self, str): + s = str.strip().lower() + return ((s.startswith('<html') and (s[5:6] in ' >')) + or s.startswith('<!doctype html')) + + + def __wrapInHTML(self, title, content): + t = escape(title) + return ( + "<html><head><title>%s</title></head>\n" + "<body><h2>%s</h2>\n" + "%s\n" + "</body></html>\n" % + (t, t, content) + ) + + + def __insertBase(self, body): + # Only insert a base tag if content appears to be html. + content_type = self.getHeader('content-type', '') + if content_type and not is_text_html(content_type): + return body + + if getattr(self, '_base', ''): + if body: + match = start_of_header_search(body) + if match is not None: + index = match.start(0) + len(match.group(0)) + ibase = base_re_search(body) + if ibase is None: + body = ('%s\n<base href="%s" />\n%s' % + (body[:index], self._base, body[index:])) + return body + + def getBase(self): + return getattr(self, '_base', '') + + def setBase(self, base): + self._base = base + + def redirect(self, location, status=None): + base = getattr(self, '_base', '') + if base and isRelative(str(location)): + l = base.rfind('/') + if l >= 0: + base = base[:l+1] + else: + base += '/' + location = base + location + + # XXX: HTTP redirects must provide an absolute location, see + # http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.30 + # So, what if location is relative and base is unknown? Uncomment + # the following and you'll see that it actually happens. + # + # if isRelative(str(location)): + # raise AssertionError('Cannot determine absolute location') + + return super(BrowserResponse, self).redirect(location, status) + + def reset(self): + super(BrowserResponse, self).reset() + self._base = '' + +def normalize_lang(lang): + lang = lang.strip().lower() + lang = lang.replace('_', '-') + lang = lang.replace(' ', '') + return lang + +class BrowserLanguages: + + implements(IUserPreferredLanguages) + + def __init__(self, request): + self.request = request + + def getPreferredLanguages(self): + '''See interface IUserPreferredLanguages''' + request = self.request + accept_langs = request.get('HTTP_ACCEPT_LANGUAGE', '').split(',') + + # Normalize lang strings + accept_langs = map(normalize_lang, accept_langs) + # Then filter out empty ones + accept_langs = filter(None, accept_langs) + + length = len(accept_langs) + accepts = [] + + for index, lang in enumerate(accept_langs): + l = lang.split(';', 2) + + quality = None + + if len(l) == 2: + q = l[1] + if q.startswith('q='): + q = q.split('=', 2)[1] + quality = float(q) + else: + # If not supplied, quality defaults to 1 + quality = 1.0 + + if quality == 1.0: + # ... but we use 1.9 - 0.001 * position to + # keep the ordering between all items with + # 1.0 quality, which may include items with no quality + # defined, and items with quality defined as 1. + quality = 1.9 - (0.001 * index) + + accepts.append((quality, l[0])) + + # Filter langs with q=0, which means + # unwanted lang according to the spec + # See: http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.4 + accepts = filter(lambda acc: acc[0], accepts) + + accepts.sort() + accepts.reverse() + + return [lang for quality, lang in accepts] + + @@ -0,0 +1,1029 @@ +############################################################################## +# +# Copyright (c) 2001, 2002 Zope Corporation and Contributors. +# All Rights Reserved. +# +# This software is subject to the provisions of the Zope Public License, +# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution. +# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED +# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS +# FOR A PARTICULAR PURPOSE. +# +############################################################################## +"""HTTP Publisher + +$Id: http.py,v 1.53 2004/05/06 10:12:13 philikon Exp $ +""" +import re, time, random +from urllib import quote, unquote, splitport +from types import StringTypes, ClassType +from cgi import escape + +from zope.interface import implements + +from zope.publisher.interfaces.http import IHTTPCredentials +from zope.publisher.interfaces.http import IHTTPRequest +from zope.publisher.interfaces.http import IHTTPApplicationRequest +from zope.publisher.interfaces.http import IHTTPPublisher + +from zope.publisher.interfaces import Redirect +from zope.publisher.interfaces.http import IHTTPResponse +from zope.publisher.interfaces.http import IHTTPApplicationResponse +from zope.publisher.interfaces.logginginfo import ILoggingInfo +from zope.i18n.interfaces import IUserPreferredCharsets +from zope.i18n.locales import locales, LoadLocaleError + +from zope.publisher.base import BaseRequest, BaseResponse +from zope.publisher.base import RequestDataProperty, RequestDataMapper +from zope.publisher.base import RequestDataGetter + + +# Default Encoding +ENCODING = 'UTF-8' + +class CookieMapper(RequestDataMapper): + _mapname = '_cookies' + +class HeaderGetter(RequestDataGetter): + _gettrname = 'getHeader' + +base64 = None + +def sane_environment(env): + # return an environment mapping which has been cleaned of + # funny business such as REDIRECT_ prefixes added by Apache + # or HTTP_CGI_AUTHORIZATION hacks. + dict = {} + for key, val in env.items(): + while key.startswith('REDIRECT_'): + key = key[9:] + dict[key] = val + if 'HTTP_CGI_AUTHORIZATION' in dict: + dict['HTTP_AUTHORIZATION'] = dict.pop('HTTP_CGI_AUTHORIZATION') + return dict + +# Possible HTTP status responses +status_reasons = { +100: 'Continue', +101: 'Switching Protocols', +102: 'Processing', +200: 'OK', +201: 'Created', +202: 'Accepted', +203: 'Non-Authoritative Information', +204: 'No Content', +205: 'Reset Content', +206: 'Partial Content', +207: 'Multi-Status', +300: 'Multiple Choices', +301: 'Moved Permanently', +302: 'Moved Temporarily', +303: 'See Other', +304: 'Not Modified', +305: 'Use Proxy', +307: 'Temporary Redirect', +400: 'Bad Request', +401: 'Unauthorized', +402: 'Payment Required', +403: 'Forbidden', +404: 'Not Found', +405: 'Method Not Allowed', +406: 'Not Acceptable', +407: 'Proxy Authentication Required', +408: 'Request Time-out', +409: 'Conflict', +410: 'Gone', +411: 'Length Required', +412: 'Precondition Failed', +413: 'Request Entity Too Large', +414: 'Request-URI Too Large', +415: 'Unsupported Media Type', +416: 'Requested range not satisfiable', +417: 'Expectation Failed', +422: 'Unprocessable Entity', +423: 'Locked', +424: 'Failed Dependency', +500: 'Internal Server Error', +501: 'Not Implemented', +502: 'Bad Gateway', +503: 'Service Unavailable', +504: 'Gateway Time-out', +505: 'HTTP Version not supported', +507: 'Insufficient Storage', +} + +status_codes={} + +def init_status_codes(): + # Add mappings for builtin exceptions and + # provide text -> error code lookups. + for key, val in status_reasons.items(): + status_codes[val.replace(' ', '').lower()] = key + status_codes[val.lower()] = key + status_codes[key] = key + status_codes[str(key)] = key + + en = [n.lower() for n in dir(__builtins__) if n.endswith('Error')] + + for name in en: + status_codes[name] = 500 + +init_status_codes() + + +class URLGetter(object): + + __slots__ = "__request" + + def __init__(self, request): + self.__request = request + + def __str__(self): + return self.__request.getURL() + + def __getitem__(self, name): + url = self.get(name, None) + if url is None: + raise KeyError, name + return url + + def get(self, name, default=None): + i = int(name) + try: + if i < 0: + i = -i + return self.__request.getURL(i) + else: + return self.__request.getApplicationURL(i) + except IndexError, v: + if v[0] == i: + return default + raise + +DEFAULT_PORTS = {'http': '80', 'https': '443'} +STAGGER_RETRIES = True + +class HTTPRequest(BaseRequest): + """Model HTTP request data. + + This object provides access to request data. This includes, the + input headers, form data, server data, and cookies. + + Request objects are created by the object publisher and will be + passed to published objects through the argument name, REQUEST. + + The request object is a mapping object that represents a + collection of variable to value mappings. In addition, variables + are divided into four categories: + + - Environment variables + + These variables include input headers, server data, and other + request-related data. The variable names are as <a + href="http://hoohoo.ncsa.uiuc.edu/cgi/env.html">specified</a> + in the <a + href="http://hoohoo.ncsa.uiuc.edu/cgi/interface.html">CGI + specification</a> + + - Form data + + These are data extracted from either a URL-encoded query + string or body, if present. + + - Cookies + + These are the cookie data, if present. + + - Other + + Data that may be set by an application object. + + The form attribute of a request is actually a Field Storage + object. When file uploads are used, this provides a richer and + more complex interface than is provided by accessing form data as + items of the request. See the FieldStorage class documentation + for more details. + + The request object may be used as a mapping object, in which case + values will be looked up in the order: environment variables, + other variables, form data, and then cookies. + """ + implements(IHTTPCredentials, IHTTPRequest, IHTTPApplicationRequest) + + __slots__ = ( + '_auth', # The value of the HTTP_AUTHORIZATION header. + '_cookies', # The request cookies + '_path_suffix', # Extra traversal steps after normal traversal + '_retry_count', # How many times the request has been retried + '_app_names', # The application path as a sequence + '_app_server', # The server path of the application url + '_orig_env', # The original environment + '_endswithslash', # Does the given path end with / + 'method', # The upper-cased request method (REQUEST_METHOD) + '_locale', # The locale for the request + '_vh_root', # Object at the root of the virtual host + ) + + retry_max_count = 3 # How many times we're willing to retry + + def __init__(self, body_instream, outstream, environ, response=None): + super(HTTPRequest, self).__init__( + body_instream, outstream, environ, response) + + self._orig_env = environ + environ = sane_environment(environ) + + if 'HTTP_AUTHORIZATION' in environ: + self._auth = environ['HTTP_AUTHORIZATION'] + del environ['HTTP_AUTHORIZATION'] + else: + self._auth = None + + self.method = environ.get("REQUEST_METHOD", 'GET').upper() + + self._environ = environ + + self.__setupCookies() + self.__setupPath() + self.__setupURLBase() + self._vh_root = None + self.__setupLocale() + + def __setupLocale(self): + # Import here to break import loops + from zope.publisher.browser import BrowserLanguages + + self.response.setCharsetUsingRequest(self) + langs = BrowserLanguages(self).getPreferredLanguages() + for httplang in langs: + parts = (httplang.split('-') + [None, None])[:3] + try: + self._locale = locales.getLocale(*parts) + return + except LoadLocaleError: + # Just try the next combination + pass + else: + # No combination gave us an existing locale, so use the default, + # which is guaranteed to exist + self._locale = locales.getLocale(None, None, None) + + def _getLocale(self): + return self._locale + locale = property(_getLocale) + + def __setupURLBase(self): + get_env = self._environ.get + # Get base info first. This isn't likely to cause + # errors and might be useful to error handlers. + script = get_env('SCRIPT_NAME', '').strip() + + # _script and the other _names are meant for URL construction + self._app_names = filter(None, script.split('/')) + + # get server URL and store it too, since we are already looking it up + server_url = get_env('SERVER_URL', None) + if server_url is not None: + self._app_server = server_url = server_url.strip() + else: + server_url = self.__deduceServerURL() + + if server_url.endswith('/'): + server_url = server_url[:-1] + + # strip off leading /'s of script + while script.startswith('/'): + script = script[1:] + + self._app_server = server_url + + def __deduceServerURL(self): + environ = self._environ + + if (environ.get('HTTPS', '').lower() == "on" or + environ.get('SERVER_PORT_SECURE') == "1"): + protocol = 'https' + else: + protocol = 'http' + + if environ.has_key('HTTP_HOST'): + host = environ['HTTP_HOST'].strip() + hostname, port = splitport(host) + else: + hostname = environ.get('SERVER_NAME', '').strip() + port = environ.get('SERVER_PORT', '') + + if port and port != DEFAULT_PORTS.get(protocol): + host = hostname + ':' + port + else: + host = hostname + + return '%s://%s' % (protocol, host) + + _cookieFormat = re.compile('[\x00- ]*' + # Cookie name + '([^\x00- ;,="]+)=' + # Cookie value (either correct quoted or MSIE) + '(?:"([^"]*)"|([^\x00- ;,"]*))' + '(?:[\x00- ]*[;,])?[\x00- ]*') + + def _parseCookies(self, text, result=None): + """Parse 'text' and return found cookies as 'result' dictionary.""" + + if result is None: + result = {} + + cookieFormat = self._cookieFormat + + pos = 0 + ln = len(text) + while pos < ln: + match = cookieFormat.match(text, pos) + if match is None: + break + + name = unicode(match.group(1), ENCODING) + if name not in result: + value, ms_value = match.group(2, 3) + if value is None: + value = ms_value + result[name] = unicode(value, ENCODING) + + pos = match.end() + + return result + + def __setupCookies(self): + # Cookie values should *not* be appended to existing form + # vars with the same name - they are more like default values + # for names not otherwise specified in the form. + self._cookies = {} + cookie_header = self._environ.get('HTTP_COOKIE', None) + if cookie_header is not None: + self._parseCookies(cookie_header, self._cookies) + + def __setupPath(self): + # The recommendation states that: + # + # Unless there is some compelling reason for a + # particular scheme to do otherwise, translating character sequences + # into UTF-8 (RFC 2279) [3] and then subsequently using the %HH + # encoding for unsafe octets is recommended. + # + # See: http://www.ietf.org/rfc/rfc2718.txt, Section 2.2.5 + self._setupPath_helper("PATH_INFO") + stack = self.getTraversalStack() + stack = [unquote(seg).decode('utf-8') for seg in stack] + self.setTraversalStack(stack) + + def supportsRetry(self): + 'See IPublisherRequest' + count = getattr(self, '_retry_count', 0) + if count < self.retry_max_count: + if STAGGER_RETRIES: + time.sleep(random.uniform(0, 2**(count))) + return True + + def retry(self): + 'See IPublisherRequest' + count = getattr(self, '_retry_count', 0) + self._retry_count = count + 1 + self._body_instream.seek(0) + new_response = self.response.retry() + request = self.__class__( + body_instream=self._body_instream, + outstream=None, + environ=self._orig_env, + response=new_response, + ) + request.setPublication(self.publication) + request._retry_count = self._retry_count + return request + + def traverse(self, object): + 'See IPublisherRequest' + + ob = super(HTTPRequest, self).traverse(object) + if self._path_suffix: + self._traversal_stack = self._path_suffix + ob = super(HTTPRequest, self).traverse(ob) + + return ob + + def getHeader(self, name, default=None, literal=False): + 'See IHTTPRequest' + environ = self._environ + if not literal: + name = name.replace('-', '_').upper() + val = environ.get(name, None) + if val is not None: + return val + if not name.startswith('HTTP_'): + name='HTTP_%s' % name + return environ.get(name, default) + + headers = RequestDataProperty(HeaderGetter) + + def getCookies(self): + 'See IHTTPRequest' + return self._cookies + + cookies = RequestDataProperty(CookieMapper) + + def setPathSuffix(self, steps): + 'See IHTTPRequest' + steps = list(steps) + steps.reverse() + self._path_suffix = steps + + def _authUserPW(self): + 'See IHTTPCredentials' + global base64 + if self._auth: + if self._auth.lower().startswith('basic '): + if base64 is None: + import base64 + name, password = base64.decodestring( + self._auth.split()[-1]).split(':') + return name, password + + def unauthorized(self, challenge): + 'See IHTTPCredentials' + self._response.setHeader("WWW-Authenticate", challenge, True) + self._response.setStatus(401) + + def setPrincipal(self, principal): + 'See IPublicationRequest' + super(HTTPRequest, self).setPrincipal(principal) + + if self.response.http_transaction is not None: + logging_info = ILoggingInfo(principal) + message = logging_info.getLogMessage() + self.response.http_transaction.setAuthUserName(message) + + def _createResponse(self, outstream): + # Should be overridden by subclasses + return HTTPResponse(outstream) + + + def getURL(self, level=0, path_only=False): + names = self._app_names + self._traversed_names + if level: + if level > len(names): + raise IndexError, level + names = names[:-level] + # See: http://www.ietf.org/rfc/rfc2718.txt, Section 2.2.5 + names = [quote(name.encode("utf-8"), safe='/+@') for name in names] + + if path_only: + if not names: + return '/' + return '/' + '/'.join(names) + else: + if not names: + return self._app_server + return "%s/%s" % (self._app_server, '/'.join(names)) + + def getApplicationURL(self, depth=0, path_only=False): + """See IHTTPApplicationRequest""" + if depth: + names = self._traversed_names + if depth > len(names): + raise IndexError, depth + names = self._app_names + names[:depth] + else: + names = self._app_names + + # See: http://www.ietf.org/rfc/rfc2718.txt, Section 2.2.5 + names = [quote(name.encode("utf-8"), safe='/+@') for name in names] + + if path_only: + return names and ('/' + '/'.join(names)) or '/' + else: + return (names and ("%s/%s" % (self._app_server, '/'.join(names))) + or self._app_server) + + def setApplicationServer(self, host, proto='http', port=None): + if port and str(port) != DEFAULT_PORTS.get(proto): + host = '%s:%s' % (host, port) + self._app_server = '%s://%s' % (proto, host) + + def shiftNameToApplication(self): + """Add the name being traversed to the application name + + This is only allowed in the case where the name is the first name. + + A Value error is raise if the shift can't be performed. + """ + if len(self._traversed_names) == 1: + self._app_names.append(self._traversed_names.pop()) + return + + raise ValueError("Can only shift leading traversal " + "names to application names") + + def setVirtualHostRoot(self, names=()): + del self._traversed_names[:] + self._vh_root = self._last_obj_traversed + self._app_names = list(names) + + def getVirtualHostRoot(self): + return self._vh_root + + URL = RequestDataProperty(URLGetter) + + def __repr__(self): + # Returns a *short* string. + return '<%s.%s instance URL=%s>' % ( + self.__class__.__module__, self.__class__.__name__, str(self.URL)) + + def get(self, key, default=None): + 'See Interface.Common.Mapping.IReadMapping' + + result = self._cookies.get(key, self) + if result is not self: return result + + result = self._environ.get(key, self) + if result is not self: return result + + return default + + def keys(self): + 'See Interface.Common.Mapping.IEnumerableMapping' + d = {} + d.update(self._environ) + d.update(self._cookies) + return d.keys() + + +class HTTPResponse(BaseResponse): + implements(IHTTPResponse, IHTTPApplicationResponse) + + __slots__ = ( + '_header_output', # Hook object to collaborate with a server + # for header generation. + '_headers', + '_cookies', + '_accumulated_headers', # Headers that can have multiples + '_wrote_headers', + '_streaming', + '_status', # The response status (usually an integer) + '_reason', # The reason that goes with the status + '_status_set', # Boolean: status explicitly set + '_charset', # String: character set for the output + 'http_transaction', # HTTPTask object + ) + + + def __init__(self, outstream, header_output=None, http_transaction=None): + self._header_output = header_output + self.http_transaction = http_transaction + + super(HTTPResponse, self).__init__(outstream) + self.reset() + + def reset(self): + 'See IResponse' + super(HTTPResponse, self).reset() + self._headers = {} + self._cookies = {} + self._accumulated_headers = [] + self._wrote_headers = False + self._streaming = False + self._status = 599 + self._reason = 'No status set' + self._status_set = False + self._charset = None + + def setHeaderOutput(self, header_output): + self._header_output = header_output + + def setHTTPTransaction(self, http_transaction): + self.http_transaction = http_transaction + + def setStatus(self, status, reason=None): + 'See IHTTPResponse' + if status is None: + status = 200 + else: + if type(status) in StringTypes: + status = status.lower() + if status in status_codes: + status = status_codes[status] + else: + status = 500 + self._status = status + + if reason is None: + if status == 200: + reason = 'Ok' + elif status in status_reasons: + reason = status_reasons[status] + else: + reason = 'Unknown' + self._reason = reason + self._status_set = True + + + def getStatus(self): + 'See IHTTPResponse' + return self._status + + + def setHeader(self, name, value, literal=False): + 'See IHTTPResponse' + + name = str(name) + value = str(value) + + key = name.lower() + if key == 'set-cookie': + self.addHeader(name, value) + else: + name = literal and name or key + self._headers[name]=value + + + def addHeader(self, name, value): + 'See IHTTPResponse' + accum = self._accumulated_headers + accum.append('%s: %s' % (name, value)) + + + def getHeader(self, name, default=None, literal=False): + 'See IHTTPResponse' + key = name.lower() + name = literal and name or key + return self._headers.get(name, default) + + def getHeaders(self): + 'See IHTTPResponse' + result = {} + headers = self._headers + + if (not self._streaming and not ('content-length' in headers) + and not ('transfer-encoding' in headers)): + self._updateContentLength() + + result["X-Powered-By"] = "Zope (www.zope.org), Python (www.python.org)" + + for key, val in headers.items(): + if key.lower() == key: + # only change non-literal header names + key = key.capitalize() + start = 0 + location = key.find('-', start) + while location >= start: + key = "%s-%s" % (key[:location], + key[location+1:].capitalize()) + start = location + 1 + location = key.find('-', start) + result[key] = val + + return result + + + def appendToHeader(self, name, value, delimiter=','): + 'See IHTTPResponse' + headers = self._headers + if name in headers: + h = self._header[name] + h = "%s%s\r\n\t%s" % (h, delimiter, value) + else: + h = value + self.setHeader(name, h) + + + def appendToCookie(self, name, value): + 'See IHTTPResponse' + cookies = self._cookies + if name in cookies: + cookie = cookies[name] + else: + cookie = cookies[name] = {} + if 'value' in cookie: + cookie['value'] = '%s:%s' % (cookie['value'], value) + else: + cookie['value'] = value + + + def expireCookie(self, name, **kw): + 'See IHTTPResponse' + dict = {'max_age':0, 'expires':'Wed, 31-Dec-97 23:59:59 GMT'} + for k, v in kw.items(): + if v is not None: + dict[k] = v + cookies = self._cookies + if name in cookies: + # Cancel previous setCookie(). + del cookies[name] + self.setCookie(name, 'deleted', **dict) + + + def setCookie(self, name, value, **kw): + 'See IHTTPResponse' + cookies = self._cookies + cookie = cookies.setdefault(name, {}) + + for k, v in kw.items(): + if v is not None: + cookie[k] = v + + cookie['value'] = value + + + def getCookie(self, name, default=None): + 'See IHTTPResponse' + return self._cookies.get(name, default) + + + def setCharset(self, charset=None): + 'See IHTTPResponse' + self._charset = charset + + def _updateContentType(self): + if self._charset is not None: + ctype = self.getHeader('content-type', '') + if ctype.startswith("text") and "charset" not in ctype: + self.setHeader('content-type', + ctype + ";charset=" + self._charset) + + def setCharsetUsingRequest(self, request): + 'See IHTTPResponse' + envadapter = IUserPreferredCharsets(request, None) + if envadapter is None: + return + + try: + charset = envadapter.getPreferredCharsets()[0] + except IndexError: + # Exception caused by empty list! This is okay though, since the + # browser just could have sent a '*', which means we can choose + # the encoding, which we do here now. + charset = 'utf-8' + self.setCharset(charset) + + def setBody(self, body): + self._body = unicode(body) + if not self._status_set: + self.setStatus(200) + + def handleException(self, exc_info): + """ + Calls self.setBody() with an error response. + """ + t, v = exc_info[:2] + if isinstance(t, ClassType): + title = tname = t.__name__ + if issubclass(t, Redirect): + self.redirect(v.getLocation()) + return + else: + title = tname = unicode(t) + + # Throwing non-protocol-specific exceptions is a good way + # for apps to control the status code. + self.setStatus(tname) + + body = self._html(title, "A server error occurred." ) + self.setBody(body) + + + def internalError(self): + 'See IPublisherResponse' + self.setStatus(500, u"The engines can't take any more, Jim!") + + + def _html(self, title, content): + t = escape(title) + return ( + u"<html><head><title>%s</title></head>\n" + u"<body><h2>%s</h2>\n" + u"%s\n" + u"</body></html>\n" % + (t, t, content) + ) + + + def retry(self): + """ + Returns a response object to be used in a retry attempt + """ + return self.__class__(self._outstream, + self._header_output) + + def _updateContentLength(self, data=None): + if data is None: + blen = str(len(self._body)) + else: + blen = str(len(data)) + if blen.endswith('L'): + blen = blen[:-1] + self.setHeader('content-length', blen) + + def redirect(self, location, status=None): + """Causes a redirection without raising an error""" + if status is None: + # parse the HTTP version and set default accordingly + if (self._request.get("SERVER_PROTOCOL","HTTP/1.0") < + "HTTP/1.1"): + status=302 + else: + status=303 + + self.setStatus(status) + self.setHeader('Location', location) + return location + + def _cookie_list(self): + cookie_list = [] + for name, attrs in self._cookies.items(): + + # Note that as of May 98, IE4 ignores cookies with + # quoted cookie attr values, so only the value part + # of name=value pairs may be quoted. + + cookie='Set-Cookie: %s="%s"' % (name, attrs['value']) + for name, value in attrs.items(): + name = name.lower() + if name == 'expires': + cookie = '%s; Expires=%s' % (cookie,value) + elif name == 'domain': + cookie = '%s; Domain=%s' % (cookie,value) + elif name == 'path': + cookie = '%s; Path=%s' % (cookie,value) + elif name == 'max_age': + cookie = '%s; Max-Age=%s' % (cookie,value) + elif name == 'comment': + cookie = '%s; Comment=%s' % (cookie,value) + elif name == 'secure' and value: + cookie = '%s; Secure' % cookie + cookie_list.append(cookie) + + # XXX: Should really check size of cookies here! + + return cookie_list + + + def getHeaderText(self, m): + lst = ['Status: %s %s' % (self._status, self._reason)] + items = m.items() + items.sort() + lst.extend(['%s: %s' % i for i in items]) + lst.extend(self._cookie_list()) + lst.extend(self._accumulated_headers) + return ('%s\r\n\r\n' % '\r\n'.join(lst)) + + + def outputHeaders(self): + """This method outputs all headers. + Since it is a final output method, it must take care of all possible + unicode strings and encode them! + """ + if self._charset is None: + self.setCharset('utf-8') + self._updateContentType() + encode = self._encode + headers = self.getHeaders() + # Clean these headers from unicode by possibly encoding them + headers = dict([(encode(key), encode(val)) + for key, val in headers.iteritems()]) + # Cleaning done. + header_output = self._header_output + if header_output is not None: + # Use the IHeaderOutput interface. + header_output.setResponseStatus(self._status, encode(self._reason)) + header_output.setResponseHeaders(headers) + cookie_list = map(encode, self._cookie_list()) + header_output.appendResponseHeaders(cookie_list) + accumulated_headers = map(encode, self._accumulated_headers) + header_output.appendResponseHeaders(accumulated_headers) + else: + # Write directly to outstream. + headers_text = self.getHeaderText(headers) + self._outstream.write(encode(headers_text)) + + def write(self, string): + """See IApplicationResponse + + Return data as a stream + + HTML data may be returned using a stream-oriented interface. + This allows the browser to display partial results while + computation of a response to proceed. + + The published object should first set any output headers or + cookies on the response object and encode the string into + appropriate encoding. + + Note that published objects must not generate any errors + after beginning stream-oriented output. + + """ + self.output(string) + + def output(self, data): + """Output the data to the world. There are a couple of steps we have + to do: + + 1. Check that there is a character encoding for the data. If not, + choose UTF-8. Note that if the charset is None, this is a sign of a + bug! The method setCharsetUsingRequest() specifically sets the + encoding to UTF-8, if none was found in the HTTP header. This + method should always be called when reading the HTTP request. + + 2. Now that the encoding has been finalized, we can output the + headers. + + 3. If the content type is text-based, let's encode the data and send + it also out the door. + """ + if self._charset is None: + self.setCharset('utf-8') + + if self.getHeader('content-type', '').startswith('text'): + data = self._encode(data) + self._updateContentLength(data) + + if not self._wrote_headers: + self.outputHeaders() + self._wrote_headers = True + + self._outstream.write(data) + + + def outputBody(self): + """Outputs the response body.""" + self.output(self._body) + + + def _encode(self, text): + # Any method that calls this method has the responsibility to set + # the _charset variable (if None) to a non-None value (usually UTF-8) + if isinstance(text, unicode): + return text.encode(self._charset) + return text + + +class DefaultPublisher: + implements(IHTTPPublisher) + + def publishTraverse(self, request, name): + 'See IHTTPPublisher' + + return getattr(self, name) + +def sort_charsets(x, y): + if y[1] == 'utf-8': + return 1 + if x[1] == 'utf-8': + return -1 + return cmp(y, x) + + +class HTTPCharsets: + implements(IUserPreferredCharsets) + + def __init__(self, request): + self.request = request + + def getPreferredCharsets(self): + '''See interface IUserPreferredCharsets''' + charsets = [] + sawstar = sawiso88591 = 0 + for charset in self.request.get('HTTP_ACCEPT_CHARSET', '').split(','): + charset = charset.strip().lower() + if charset: + if ';' in charset: + charset, quality = charset.split(';') + if not quality.startswith('q='): + # not a quality parameter + quality = 1.0 + else: + try: + quality = float(quality[2:]) + except ValueError: + continue + else: + quality = 1.0 + if quality == 0.0: + continue + if charset == '*': + sawstar = 1 + if charset == 'iso-8859-1': + sawiso88591 = 1 + charsets.append((quality, charset)) + # Quoting RFC 2616, $14.2: If no "*" is present in an Accept-Charset + # field, then all character sets not explicitly mentioned get a + # quality value of 0, except for ISO-8859-1, which gets a quality + # value of 1 if not explicitly mentioned. + if not sawstar and not sawiso88591: + charsets.append((1.0, 'iso-8859-1')) + # UTF-8 is **always** preferred over anything else. + # Reason: UTF-8 is not specific and can encode the entire unicode + # range , unlike many other encodings. Since Zope can easily use very + # different ranges, like providing a French-Chinese dictionary, it is + # always good to use UTF-8. + charsets.sort(sort_charsets) + return [c[1] for c in charsets] diff --git a/interfaces/__init__.py b/interfaces/__init__.py new file mode 100644 index 0000000..e5f92c1 --- /dev/null +++ b/interfaces/__init__.py @@ -0,0 +1,416 @@ +############################################################################## +# +# Copyright (c) 2001, 2002 Zope Corporation and Contributors. +# All Rights Reserved. +# +# This software is subject to the provisions of the Zope Public License, +# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution. +# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED +# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS +# FOR A PARTICULAR PURPOSE. +# +############################################################################## +"""Interfaces for the publisher. + +$Id: __init__.py,v 1.16 2004/03/20 13:38:16 philikon Exp $ +""" + +from zope.interface import Interface +from zope.interface import Attribute +from zope.exceptions import Unauthorized +from zope.exceptions import NotFoundError, INotFoundError +from zope.component.interfaces import IPresentationRequest +from zope.interface import implements +from zope.interface.common.mapping import IEnumerableMapping +from zope.interface.common.interfaces import IException +from zope.security.interfaces import IParticipation + +class IPublishingException(IException): + pass + +class PublishingException(Exception): + implements(IPublishingException) + +class ITraversalException(IPublishingException): + pass + +class TraversalException(PublishingException): + implements(ITraversalException) + +class INotFound(INotFoundError, ITraversalException): + def getObject(): + 'Returns the object that was being traversed.' + + def getName(): + 'Returns the name that was being traversed.' + +class NotFound(NotFoundError, TraversalException): + implements(INotFound) + + def __init__(self, ob, name, request=None): + self.ob = ob + self.name = name + + def getObject(self): + return self.ob + + def getName(self): + return self.name + + def __str__(self): + try: + ob = `self.ob` + except: + ob = 'unprintable object' + return 'Object: %s, name: %s' % (ob, `self.name`) + +class IDebugError(ITraversalException): + def getObject(): + 'Returns the object being traversed.' + + def getMessage(): + 'Returns the debug message.' + +class DebugError(TraversalException): + implements(IDebugError) + + def __init__(self, ob, message): + self.ob = ob + self.message = message + + def getObject(self): + return self.ob + + def getMessage(self): + return self.message + + def __str__(self): + return self.message + +class IBadRequest(IPublishingException): + def __str__(): + 'Returns the error message.' + +class BadRequest(PublishingException): + + implements(IBadRequest) + + def __init__(self, message): + self.message = message + + def __str__(self): + return self.message + +class IRedirect(IPublishingException): + def getLocation(): + 'Returns the location.' + +class Redirect(PublishingException): + + implements(IRedirect) + + def __init__(self, location): + self.location = location + + def getLocation(self): + return self.location + + def __str__(self): + return 'Location: %s' % self.location + +class IRetry(IPublishingException): + def getOriginalException(): + 'Returns the original exception object.' + +class Retry(PublishingException): + """Raise this to retry a request. + """ + + implements(IRetry) + + def __init__(self, orig_exc=None): + self.orig_exc = orig_exc + + def getOriginalException(self): + return self.orig_exc + + def __str__(self): + return repr(self.orig_exc) + + +class IExceptionSideEffects(Interface): + '''An exception caught by the publisher is adapted to this so that + it can have persistent side-effects.''' + + def __call__(obj, request, exc_info): + '''Effect persistent side-effects. + + Arguments are: + obj context-wrapped object that was published + request the request + exc_info the exception info being handled + ''' + + +class IPublishTraverse(Interface): + + def publishTraverse(request, name): + """Lookup a name + + The request argument is the publisher request object. + """ + + +class IPublisher(Interface): + + def publish(request): + """Publish a request + + The request must be an IPublisherRequest. + """ + + +class IPublisherResponse(Interface): + """Interface used by the publsher + """ + + def setBody(result): + """Sets the response result value. + """ + + def reset(): + """Resets response state on exceptions. + """ + + def handleException(exc_info): + """Handles an otherwise unhandled exception. + + The publication object gets the first chance to handle an exception, + and if it doesn't have a good way to do it, it defers to the + response. Implementations should set the reponse body. + """ + + def internalError(): + """Called when the exception handler bombs. + + Should report back to the client that an internal error occurred. + """ + + def outputBody(): + """Outputs the response to the client + """ + + def retry(): + """Returns a retry response + + Returns a response suitable for repeating the publication attempt. + """ + + +class IPublication(Interface): + """Object publication framework. + + The responsibility of publication objects is to provide + application hooks for the publishing process. This allows + application-specific tasks, such as connecting to databases, + managing transactions, and setting security contexts to be invoked + during the publishing process. + """ + # The order of the hooks mostly corresponds with the order in which + # they are invoked. + + def beforeTraversal(request): + """Pre-traversal hook. + + This is called *once* before any traversal has been done. + """ + + def getApplication(request): + """Returns the object where traversal should commence. + """ + + def callTraversalHooks(request, ob): + """Invokes any traversal hooks associated with the object. + """ + + def traverseName(request, ob, name, check_auth=1): + """Traverses to the next object. + + If check_auth is set, + performs idenitification, authentication, and authorization. + Returns the subobject. + """ + + def afterTraversal(request, ob): + """Post-traversal hook. + """ + + def callObject(request, ob): + """Call the object, returning the result. + + For GET/POST this means calling it, but for other methods + (including those of WebDAV and FTP) this might mean invoking + a method of an adapter. + """ + + def afterCall(request, ob): + """Post-callObject hook (if it was successful). + """ + + def handleException(object, request, exc_info, retry_allowed=1): + """Handle an exception + + Either: + - sets the body of the response, request.response, or + - raises a Retry exception, or + - throws another exception, which is a Bad Thing. + + Note that this method should not leak, which means that + exc_info must be set to some other value before exiting the method. + """ + + +class IApplicationResponse(Interface): + """Features that support application logic + """ + + def write(string): + """Output a string to the response body. + """ + + +class IPublicationRequest(IPresentationRequest, IParticipation): + """Interface provided by requests to IPublication objects + """ + + response = Attribute("""The request's response object + + Return an IPublisherResponse for the request. + """) + + def close(): + """Release resources held by the request. + """ + + def hold(object): + """Hold a reference to an object until the request is closed + """ + + def getTraversalStack(): + """Return the request traversal stack + + This is a sequence of steps to traverse in reverse order. They + will be traversed from last to first. + """ + + def setTraversalStack(stack): + """Change the traversal stack. + + See getTraversalStack. + """ + + def getPositionalArguments(): + """Return the positional arguments given to the request. + """ + + def setPresentationSkin(skin): + """Set the skin to be used for the request. + + It's up to the publication object to decide this. + """ + + def setPrincipal(principal): + """Set the principal attribute. + + It should be IPrincipal wrapped in it's AuthenticationService's context. + """ + + +class IPublisherRequest(IPublicationRequest): + """Request interface use by the publisher + + The responsibility of requests is to encapsulate protocol + specific details, especially wrt request inputs. + + Request objects also serve as "context" objects, providing + construction of and access to responses and storage of publication + objects. + """ + + def supportsRetry(): + """Check whether the request supports retry + + Return a boolean value indicating whether the request can be retried. + """ + + def retry(): + """Return a retry request + + Return a request suitable for repeating the publication attempt. + """ + + publication = Attribute("""The request's publication object + + The publication object, an IRequestPublication provides + application-specific functionality hooks. + """) + + def setPublication(publication): + """Set the request's publication object + """ + + def traverse(object): + """Traverse from the given object to the published object + + The published object is returned. + + The following hook methods on the publication will be called: + + - callTraversalHooks is called before each step and after + the last step. + + - traverseName to actually do a single traversal + + """ + + def processInputs(): + """Do any input processing that needs to be done before traversing + + This is done after construction to allow the publisher to + handle errors that arise. + """ + + +class IApplicationRequest(IEnumerableMapping): + """Features that support application logic + """ + + principal = Attribute("""Principal object associated with the request + This is a read-only attribute. + """) + + body = Attribute("""The body of the request as a string""") + + bodyFile = Attribute("""The body of the request as a file""") + + def __getitem__(key): + """Return request data + + The only request data are environment variables. + """ + + environment = Attribute( + """Request environment data + + This is a read-only mapping from variable name to value. + """) + + +class IResponse(IPublisherResponse, IApplicationResponse): + """The basic response contract + """ + +class IRequest(IPublisherRequest, IPublicationRequest, IApplicationRequest): + """The basic request contract + """ diff --git a/tests/test_http.py b/tests/test_http.py new file mode 100644 index 0000000..ab6234f --- /dev/null +++ b/tests/test_http.py @@ -0,0 +1,485 @@ +# -*- coding: latin-1 -*- +############################################################################## +# +# Copyright (c) 2001, 2002 Zope Corporation and Contributors. +# All Rights Reserved. +# +# This software is subject to the provisions of the Zope Public License, +# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution. +# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED +# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS +# FOR A PARTICULAR PURPOSE. +# +############################################################################## +"""HTTP Publisher Tests + +$Id: test_http.py,v 1.33 2004/04/07 14:36:48 jim Exp $ +""" +import unittest + +from zope.interface import implements +from zope.publisher.interfaces.logginginfo import ILoggingInfo +from zope.publisher.http import HTTPRequest, HTTPResponse +from zope.publisher.publish import publish +from zope.publisher.base import DefaultPublication +from zope.publisher.interfaces.http import IHTTPRequest, IHTTPResponse +from zope.publisher.interfaces.http import IHTTPApplicationResponse +from zope.publisher.interfaces import IResponse + +from zope.i18n.interfaces.locales import ILocale + +from zope.interface.verify import verifyObject + +from StringIO import StringIO + + +class UserStub: + implements(ILoggingInfo) + + def __init__(self, id): + self._id = id + + def getId(self): + return self._id + + def getLogMessage(self): + return self._id + + +class HTTPTests(unittest.TestCase): + + _testEnv = { + 'PATH_INFO': '/folder/item', + 'a': '5', + 'b': 6, + 'SERVER_URL': 'http://foobar.com', + 'HTTP_HOST': 'foobar.com', + 'CONTENT_LENGTH': '0', + 'HTTP_AUTHORIZATION': 'Should be in accessible', + 'GATEWAY_INTERFACE': 'TestFooInterface/1.0', + 'HTTP_OFF_THE_WALL': "Spam 'n eggs", + 'HTTP_ACCEPT_CHARSET': 'ISO-8859-1, UTF-8;q=0.66, UTF-16;q=0.33', + } + + def setUp(self): + class AppRoot: + " " + + class Folder: + " " + + class Item: + " " + def __call__(self, a, b): + return "%s, %s" % (`a`, `b`) + + self.app = AppRoot() + self.app.folder = Folder() + self.app.folder.item = Item() + self.app.xxx = Item() + + def _createRequest(self, extra_env={}, body="", outstream=None): + env = self._testEnv.copy() + env.update(extra_env) + if len(body): + env['CONTENT_LENGTH'] = str(len(body)) + + publication = DefaultPublication(self.app) + if outstream is None: + outstream = StringIO() + instream = StringIO(body) + request = HTTPRequest(instream, outstream, env) + request.setPublication(publication) + return request + + def _publisherResults(self, extra_env={}, body=""): + outstream = StringIO() + request = self._createRequest(extra_env, body, outstream=outstream) + publish(request, handle_errors=0) + return outstream.getvalue() + + + def test_repr(self): + request = self._createRequest() + expect = '<%s.%s instance URL=http://foobar.com>' % ( + request.__class__.__module__, request.__class__.__name__) + self.assertEqual(repr(request), expect) + + def testTraversalToItem(self): + res = self._publisherResults() + self.failUnlessEqual( + res, + "Status: 200 Ok\r\n" + "Content-Length: 6\r\n" + "X-Powered-By: Zope (www.zope.org), Python (www.python.org)\r\n" + "\r\n" + "'5', 6") + + def testRedirect(self): + # test HTTP/1.0 + env = {'SERVER_PROTOCOL':'HTTP/1.0'} + + request = self._createRequest(env, '') + location = request.response.redirect('http://foobar.com/redirected') + self.assertEquals(location, 'http://foobar.com/redirected') + self.assertEquals(request.response.getStatus(), 302) + self.assertEquals(request.response._headers['location'], location) + + # test HTTP/1.1 + env = {'SERVER_PROTOCOL':'HTTP/1.1'} + + request = self._createRequest(env, '') + location = request.response.redirect('http://foobar.com/redirected') + self.assertEquals(request.response.getStatus(), 303) + + # test explicit status + request = self._createRequest(env, '') + request.response.redirect('http://foobar.com/explicit', 304) + self.assertEquals(request.response.getStatus(), 304) + + def testRequestEnvironment(self): + req = self._createRequest() + publish(req, handle_errors=0) # Force expansion of URL variables + + self.assertEquals(str(req.URL), 'http://foobar.com/folder/item') + self.assertEquals(req.URL['-1'], 'http://foobar.com/folder') + self.assertEquals(req.URL['-2'], 'http://foobar.com') + self.assertRaises(KeyError, req.URL.__getitem__, '-3') + + self.assertEquals(req.URL['0'], 'http://foobar.com') + self.assertEquals(req.URL['1'], 'http://foobar.com/folder') + self.assertEquals(req.URL['2'], 'http://foobar.com/folder/item') + self.assertRaises(KeyError, req.URL.__getitem__, '3') + + self.assertEquals(req.URL.get('0'), 'http://foobar.com') + self.assertEquals(req.URL.get('1'), 'http://foobar.com/folder') + self.assertEquals(req.URL.get('2'), 'http://foobar.com/folder/item') + self.assertEquals(req.URL.get('3', 'none'), 'none') + + self.assertEquals(req['SERVER_URL'], 'http://foobar.com') + self.assertEquals(req['HTTP_HOST'], 'foobar.com') + self.assertEquals(req['PATH_INFO'], '/folder/item') + self.assertEquals(req['CONTENT_LENGTH'], '0') + self.assertRaises(KeyError, req.__getitem__, 'HTTP_AUTHORIZATION') + self.assertEquals(req['GATEWAY_INTERFACE'], 'TestFooInterface/1.0') + self.assertEquals(req['HTTP_OFF_THE_WALL'], "Spam 'n eggs") + + self.assertRaises(KeyError, req.__getitem__, + 'HTTP_WE_DID_NOT_PROVIDE_THIS') + + def testRequestLocale(self): + eq = self.assertEqual + unless = self.failUnless + for httplang in ('it', 'it-ch', 'it-CH', 'IT', 'IT-CH', 'IT-ch'): + req = self._createRequest({'HTTP_ACCEPT_LANGUAGE': httplang}) + locale = req.locale + unless(ILocale.providedBy(locale)) + parts = httplang.split('-') + lang = parts.pop(0).lower() + territory = variant = None + if parts: + territory = parts.pop(0).upper() + if parts: + variant = parts.pop(0).upper() + eq(locale.id.language, lang) + eq(locale.id.territory, territory) + eq(locale.id.variant, variant) + # Now test for non-existant locale fallback + req = self._createRequest({'HTTP_ACCEPT_LANGUAGE': 'xx'}) + locale = req.locale + unless(ILocale.providedBy(locale)) + eq(locale.id.language, None) + eq(locale.id.territory, None) + eq(locale.id.variant, None) + + # If the first language is not available we should try others + req = self._createRequest({'HTTP_ACCEPT_LANGUAGE': 'xx,en;q=0.5'}) + locale = req.locale + unless(ILocale.providedBy(locale)) + eq(locale.id.language, 'en') + eq(locale.id.territory, None) + eq(locale.id.variant, None) + + # Regression test: there was a bug where territory and variant were + # not reset + req = self._createRequest({'HTTP_ACCEPT_LANGUAGE': 'xx-YY,en;q=0.5'}) + locale = req.locale + unless(ILocale.providedBy(locale)) + eq(locale.id.language, 'en') + eq(locale.id.territory, None) + eq(locale.id.variant, None) + + def testCookies(self): + cookies = { + 'HTTP_COOKIE': 'foo=bar; spam="eggs", this="Should be accepted"' + } + req = self._createRequest(extra_env=cookies) + + self.assertEquals(req.cookies[u'foo'], u'bar') + self.assertEquals(req[u'foo'], u'bar') + + self.assertEquals(req.cookies[u'spam'], u'eggs') + self.assertEquals(req[u'spam'], u'eggs') + + self.assertEquals(req.cookies[u'this'], u'Should be accepted') + self.assertEquals(req[u'this'], u'Should be accepted') + + def testHeaders(self): + headers = { + 'TEST_HEADER': 'test', + 'Another-Test': 'another', + } + req = self._createRequest(extra_env=headers) + self.assertEquals(req.headers[u'TEST_HEADER'], u'test') + self.assertEquals(req.headers[u'TEST-HEADER'], u'test') + self.assertEquals(req.headers[u'test_header'], u'test') + self.assertEquals(req.getHeader('TEST_HEADER', literal=True), u'test') + self.assertEquals(req.getHeader('TEST-HEADER', literal=True), None) + self.assertEquals(req.getHeader('test_header', literal=True), None) + self.assertEquals(req.getHeader('Another-Test', literal=True), + 'another') + + def testBasicAuth(self): + from zope.publisher.interfaces.http import IHTTPCredentials + import base64 + req = self._createRequest() + verifyObject(IHTTPCredentials, req) + lpq = req._authUserPW() + self.assertEquals(lpq, None) + env = {} + login, password = ("tim", "123") + s = base64.encodestring("%s:%s" % (login, password)).rstrip() + env['HTTP_AUTHORIZATION'] = "Basic %s" % s + req = self._createRequest(env) + lpw = req._authUserPW() + self.assertEquals(lpw, (login, password)) + + def testSetPrincipal(self): + class HTTPTaskStub: + auth_user_name = None + def setAuthUserName(self, name): + self.auth_user_name = name + + task = HTTPTaskStub() + req = self._createRequest(outstream=task) + req.setPrincipal(UserStub("jim")) + self.assert_(not req.response._outstream.auth_user_name) + req = self._createRequest(outstream=task) + req.response.setHTTPTransaction(task) + req.setPrincipal(UserStub("jim")) + self.assertEquals(req.response.http_transaction.auth_user_name, "jim") + + def testIPresentationRequest(self): + # test the IView request + r = self._createRequest() + + self.assertEqual(r.getPresentationSkin(), '') + r.setPresentationSkin('morefoo') + self.assertEqual(r.getPresentationSkin(), 'morefoo') + + def test_method(self): + r = self._createRequest(extra_env={'REQUEST_METHOD':'SPAM'}) + self.assertEqual(r.method, 'SPAM') + r = self._createRequest(extra_env={'REQUEST_METHOD':'eggs'}) + self.assertEqual(r.method, 'EGGS') + + def test_setApplicationServer(self): + req = self._createRequest() + req.setApplicationServer('foo') + self.assertEquals(req._app_server, 'http://foo') + req.setApplicationServer('foo', proto='https') + self.assertEquals(req._app_server, 'https://foo') + req.setApplicationServer('foo', proto='https', port=8080) + self.assertEquals(req._app_server, 'https://foo:8080') + req.setApplicationServer('foo', proto='http', port='9673') + self.assertEquals(req._app_server, 'http://foo:9673') + req.setApplicationServer('foo', proto='https', port=443) + self.assertEquals(req._app_server, 'https://foo') + req.setApplicationServer('foo', proto='https', port='443') + self.assertEquals(req._app_server, 'https://foo') + req.setApplicationServer('foo', port=80) + self.assertEquals(req._app_server, 'http://foo') + req.setApplicationServer('foo', proto='telnet', port=80) + self.assertEquals(req._app_server, 'telnet://foo:80') + + def test_setApplicationNames(self): + req = self._createRequest() + names = ['x', 'y', 'z'] + req.setVirtualHostRoot(names) + self.assertEquals(req._app_names, ['x', 'y', 'z']) + names[0] = 'muahahahaha' + self.assertEquals(req._app_names, ['x', 'y', 'z']) + + def test_setVirtualHostRoot(self): + req = self._createRequest() + req._traversed_names = ['x', 'y'] + req._last_obj_traversed = object() + req.setVirtualHostRoot() + self.failIf(req._traversed_names) + self.assertEquals(req._vh_root, req._last_obj_traversed) + + def test_getVirtualHostRoot(self): + req = self._createRequest() + self.assertEquals(req.getVirtualHostRoot(), None) + req._vh_root = object() + self.assertEquals(req.getVirtualHostRoot(), req._vh_root) + + def test_traverse(self): + req = self._createRequest() + req.traverse(self.app) + self.assertEquals(req._traversed_names, ['folder', 'item']) + + # setting it during traversal matters + req = self._createRequest() + def hook(self, object, req=req, app=self.app): + if object is app.folder: + req.setVirtualHostRoot() + req.publication.callTraversalHooks = hook + req.traverse(self.app) + self.assertEquals(req._traversed_names, ['item']) + self.assertEquals(req._vh_root, self.app.folder) + + def testInterface(self): + from zope.publisher.interfaces.http import IHTTPCredentials + from zope.publisher.interfaces.http import IHTTPApplicationRequest + rq = self._createRequest() + verifyObject(IHTTPRequest, rq) + verifyObject(IHTTPCredentials, rq) + verifyObject(IHTTPApplicationRequest, rq) + + def testDeduceServerURL(self): + req = self._createRequest() + deduceServerURL = req._HTTPRequest__deduceServerURL + req._environ = {'HTTP_HOST': 'example.com:80'} + self.assertEquals(deduceServerURL(), 'http://example.com') + req._environ = {'HTTP_HOST': 'example.com:8080'} + self.assertEquals(deduceServerURL(), 'http://example.com:8080') + req._environ = {'HTTP_HOST': 'example.com:443', 'HTTPS': 'on'} + self.assertEquals(deduceServerURL(), 'https://example.com') + req._environ = {'HTTP_HOST': 'example.com:80', 'HTTPS': 'ON'} + self.assertEquals(deduceServerURL(), 'https://example.com:80') + req._environ = {'HTTP_HOST': 'example.com:8080', + 'SERVER_PORT_SECURE': '1'} + self.assertEquals(deduceServerURL(), 'https://example.com:8080') + req._environ = {'SERVER_NAME': 'example.com', 'SERVER_PORT':'8080', + 'SERVER_PORT_SECURE': '0'} + self.assertEquals(deduceServerURL(), 'http://example.com:8080') + req._environ = {'SERVER_NAME': 'example.com'} + self.assertEquals(deduceServerURL(), 'http://example.com') + + def testUnicodeURLs(self): + req = self._createRequest( + {'PATH_INFO': '/%C3%A4%C3%B6/%C3%BC%C3%9F/foo/bar.html'}) + self.assertEqual(req._traversal_stack, + [u'bar.html', u'foo', u'üß', u'äö']) + + +class ConcreteHTTPTests(HTTPTests): + """Tests that we don't have to worry about subclasses inheriting and + breaking. + """ + + def test_shiftNameToApplication(self): + r = self._createRequest() + publish(r, handle_errors=0) + appurl = r.getApplicationURL() + + # Verify that we can shift. It would be a little more realistic + # if we could test this during traversal, but the api doesn't + # let us do that. + r = self._createRequest(extra_env={"PATH_INFO": "/xxx"}) + publish(r, handle_errors=0) + r.shiftNameToApplication() + self.assertEquals(r.getApplicationURL(), appurl+"/xxx") + + # Verify that we can only shift if we've traversed only a single name + r = self._createRequest(extra_env={"PATH_INFO": "/folder/item"}) + publish(r, handle_errors=0) + self.assertRaises(ValueError, r.shiftNameToApplication) + + + +class TestHTTPResponse(unittest.TestCase): + + def testInterface(self): + rp = HTTPResponse(StringIO()) + verifyObject(IHTTPResponse, rp) + verifyObject(IHTTPApplicationResponse, rp) + verifyObject(IResponse, rp) + + def _createResponse(self): + stream = StringIO() + response = HTTPResponse(stream) + return response, stream + + def _parseResult(self, result): + hdrs_text, body = result.split("\r\n\r\n", 1) + headers = {} + for line in hdrs_text.splitlines(): + key, val = line.split(":", 1) + headers[key.strip()] = val.strip() + return headers, body + + def _getResultFromResponse(self, body, charset=None, headers=None): + response, stream = self._createResponse() + if charset is not None: + response.setCharset() + if headers is not None: + for hdr, val in headers.iteritems(): + response.setHeader(hdr, val) + response.setBody(body) + response.outputBody() + return self._parseResult(stream.getvalue()) + + def testContentLength(self): + eq = self.failUnlessEqual + + headers, body = self._getResultFromResponse("test", "utf-8", + {"content-type": "text/plain"}) + eq("4", headers["Content-Length"]) + eq("test", body) + + headers, body = self._getResultFromResponse( + u'\u0442\u0435\u0441\u0442', "utf-8", + {"content-type": "text/plain"}) + eq("8", headers["Content-Length"]) + eq('\xd1\x82\xd0\xb5\xd1\x81\xd1\x82', body) + + def testContentType(self): + eq = self.failUnlessEqual + + headers, body = self._getResultFromResponse("test", "utf-8") + eq("", headers.get("Content-Type", "")) + eq("test", body) + + headers, body = self._getResultFromResponse("test", + headers={"content-type": "text/plain"}) + eq("text/plain;charset=utf-8", headers["Content-Type"]) + eq("test", body) + + headers, body = self._getResultFromResponse("test", "utf-8", + {"content-type": "text/html"}) + eq("text/html;charset=utf-8", headers["Content-Type"]) + eq("test", body) + + headers, body = self._getResultFromResponse("test", "utf-8", + {"content-type": "text/plain;charset=cp1251"}) + eq("text/plain;charset=cp1251", headers["Content-Type"]) + eq("test", body) + + headers, body = self._getResultFromResponse("test", "utf-8", + {"content-type": "image/gif"}) + eq("image/gif", headers["Content-Type"]) + eq("test", body) + + +def test_suite(): + suite = unittest.TestSuite() + suite.addTest(unittest.makeSuite(ConcreteHTTPTests)) + suite.addTest(unittest.makeSuite(TestHTTPResponse)) + return suite + + +if __name__ == '__main__': + unittest.main() |