summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarius Gedminas <marius@gedmin.as>2004-05-12 20:00:38 +0000
committerMarius Gedminas <marius@gedmin.as>2004-05-12 20:00:38 +0000
commit9a4cf5a13af8a9e36c36fff589d919c6e55aa277 (patch)
treed0dee5f999bde1eda337bac2ac105818aba83148
downloadzope-publisher-monolithic-zope3-mgedmin-security.tar.gz
Made BaseRequest an IParticipation and replaced request.user withmonolithic-zope3-mgedmin-security
request.principal everywhere.
-rw-r--r--base.py502
-rw-r--r--browser.py786
-rw-r--r--http.py1029
-rw-r--r--interfaces/__init__.py416
-rw-r--r--tests/test_http.py485
5 files changed, 3218 insertions, 0 deletions
diff --git a/base.py b/base.py
new file mode 100644
index 0000000..0a929fc
--- /dev/null
+++ b/base.py
@@ -0,0 +1,502 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Base implementations of the Publisher objects
+
+Specifically, 'BaseRequest', 'BaseResponse', and 'DefaultPublication' are
+specified here.
+
+$Id: base.py,v 1.19 2004/05/06 10:12:13 philikon Exp $
+"""
+import traceback
+from cStringIO import StringIO
+
+from zope.interface import implements
+from zope.interface.common.mapping import IReadMapping, IEnumerableMapping
+from zope.exceptions import NotFoundError
+
+from zope.publisher.interfaces import IPublication
+from zope.publisher.interfaces import NotFound, DebugError, Unauthorized
+from zope.publisher.interfaces import IRequest, IResponse
+from zope.publisher.publish import mapply
+
+_marker = object()
+
+class BaseResponse(object):
+ """Base Response Class
+ """
+
+ __slots__ = (
+ '_body', # The response body
+ '_outstream', # The output stream
+ '_request', # The associated request (if any)
+ )
+
+ implements(IResponse)
+
+
+ def __init__(self, outstream):
+ self._body = ''
+ self._outstream = outstream
+
+ def outputBody(self):
+ 'See IPublisherResponse'
+ self._outstream.write(self._getBody())
+
+ def setBody(self, body):
+ 'See IPublisherResponse'
+ self._body = body
+
+ # This method is not part of this interface
+ def _getBody(self):
+ 'Returns a string representing the currently set body.'
+ return self._body
+
+ def reset(self):
+ 'See IPublisherResponse'
+ self._body = ""
+
+ def handleException(self, exc_info):
+ 'See IPublisherResponse'
+ traceback.print_exception(
+ exc_info[0], exc_info[1], exc_info[2], 100, self)
+
+ def internalError(self):
+ 'See IPublisherResponse'
+ pass
+
+ def retry(self):
+ 'See IPublisherResponse'
+ return self.__class__(self.outstream)
+
+ def write(self, string):
+ 'See IApplicationResponse'
+ self._body += string
+
+class RequestDataGetter(object):
+
+ implements(IReadMapping)
+
+ def __init__(self, request):
+ self.__get = getattr(request, self._gettrname)
+
+ def __getitem__(self, name):
+ return self.__get(name)
+
+ def get(self, name, default=None):
+ return self.__get(name, default)
+
+ def __contains__(self, key):
+ lookup = self.get(key, self)
+ return lookup is not self
+
+ has_key = __contains__
+
+class RequestDataMapper(object):
+
+ implements(IEnumerableMapping)
+
+ def __init__(self, request):
+ self.__map = getattr(request, self._mapname)
+
+ def __getitem__(self, name):
+ return self.__map[name]
+
+ def get(self, name, default=None):
+ return self.__map.get(name, default)
+
+ def __contains__(self, key):
+ lookup = self.get(key, self)
+ return lookup is not self
+
+ has_key = __contains__
+
+ def keys(self):
+ return self.__map.keys()
+
+ def __iter__(self):
+ return iter(self.keys())
+
+ def items(self):
+ return self.__map.items()
+
+ def values(self):
+ return self.__map.values()
+
+ def __len__(self):
+ return len(self.__map)
+
+class RequestDataProperty(object):
+
+ def __init__(self, gettr_class):
+ self.__gettr_class = gettr_class
+
+ def __get__(self, request, rclass=None):
+ if request is not None:
+ return self.__gettr_class(request)
+
+ def __set__(*args):
+ raise AttributeError, 'Unassignable attribute'
+
+
+class RequestEnvironment(RequestDataMapper):
+ _mapname = '_environ'
+
+class BaseRequest(object):
+ """Represents a publishing request.
+
+ This object provides access to request data. Request data may
+ vary depending on the protocol used.
+
+ Request objects are created by the object publisher and will be
+ passed to published objects through the argument name, REQUEST.
+
+ The request object is a mapping object that represents a
+ collection of variable to value mappings.
+ """
+
+ implements(IRequest)
+
+ __slots__ = (
+ '_held', # Objects held until the request is closed
+ '_traversed_names', # The names that have been traversed
+ '_last_obj_traversed', # Object that was traversed last
+ '_traversal_stack', # Names to be traversed, in reverse order
+ '_environ', # The request environment variables
+ '_response', # The response
+ '_args', # positional arguments
+ '_body_instream', # input stream
+ '_body', # The request body as a string
+ '_publication', # publication object
+ '_presentation_skin', # View skin
+ '_principal', # request principal, set by publication
+ 'interaction', # interaction, set by interaction
+ )
+
+ environment = RequestDataProperty(RequestEnvironment)
+
+ def __init__(self, body_instream, outstream, environ, response=None,
+ positional=()):
+ self._traversal_stack = []
+ self._last_obj_traversed = None
+ self._traversed_names = []
+ self._environ = environ
+
+ self._args = positional
+ if response is None:
+ self._response = self._createResponse(outstream)
+ else:
+ self._response = response
+ self._response._request = self
+
+ self._body_instream = body_instream
+ self._held = ()
+ self._principal = None
+ self.interaction = None
+
+ def setPrincipal(self, principal):
+ self._principal = principal
+
+ principal = property(lambda self: self._principal)
+
+ def _getPublication(self):
+ 'See IPublisherRequest'
+ return getattr(self, '_publication', None)
+
+ publication = property(_getPublication)
+
+
+ def processInputs(self):
+ 'See IPublisherRequest'
+ # Nothing to do here
+
+ def retry(self):
+ 'See IPublisherRequest'
+ raise TypeError('Retry is not supported')
+
+ def setPublication(self, pub):
+ 'See IPublisherRequest'
+ self._publication = pub
+
+ def supportsRetry(self):
+ 'See IPublisherRequest'
+ return 0
+
+ def traverse(self, object):
+ 'See IPublisherRequest'
+
+ publication = self.publication
+
+ traversal_stack = self._traversal_stack
+ traversed_names = self._traversed_names
+
+ self._last_obj_traversed = object
+
+ prev_object = None
+ while True:
+
+ if object is not prev_object:
+ # Invoke hooks (but not more than once).
+ publication.callTraversalHooks(self, object)
+
+ prev_object = object
+
+ if traversal_stack:
+ # Traverse to the next step.
+ entry_name = traversal_stack.pop()
+ traversed_names.append(entry_name)
+ subobject = publication.traverseName(
+ self, object, entry_name)
+ self._last_obj_traversed = object = subobject
+ else:
+ # Finished traversal.
+ break
+
+ return object
+
+ def close(self):
+ 'See IPublicationRequest'
+ self._held = None
+ self._response = None
+ self._body_instream = None
+ self._publication = None
+
+ def getPositionalArguments(self):
+ 'See IPublicationRequest'
+ return self._args
+
+ def _getResponse(self):
+ return self._response
+
+ response = property(_getResponse)
+
+ def getTraversalStack(self):
+ 'See IPublicationRequest'
+ return list(self._traversal_stack) # Return a copy
+
+ def hold(self, object):
+ 'See IPublicationRequest'
+ self._held = self._held + (object,)
+
+ def setTraversalStack(self, stack):
+ 'See IPublicationRequest'
+ self._traversal_stack[:] = list(stack)
+
+ def setPresentationSkin(self, skin):
+ 'See IPublicationRequest'
+ self._presentation_skin = skin
+
+ def getPresentationSkin(self):
+ 'See IPresentationRequest'
+ return getattr(self, '_presentation_skin', '')
+
+ def _getBody(self):
+ body = getattr(self, '_body', None)
+ if body is None:
+ s = self._body_instream
+ if s is None:
+ return None # XXX what should be returned here?
+ p = s.tell()
+ s.seek(0)
+ body = s.read()
+ s.seek(p)
+ self._body = body
+ return body
+
+ body = property(_getBody)
+
+ def _getBodyFile(self):
+ 'See IApplicationRequest'
+ return self._body_instream
+
+ bodyFile = property(_getBodyFile)
+
+ def __len__(self):
+ 'See Interface.Common.Mapping.IEnumerableMapping'
+ return len(self.keys())
+
+ def items(self):
+ 'See Interface.Common.Mapping.IEnumerableMapping'
+ result = []
+ get = self.get
+ for k in self.keys():
+ result.append((k, get(k)))
+ return result
+
+ def keys(self):
+ 'See Interface.Common.Mapping.IEnumerableMapping'
+ return self._environ.keys()
+
+ def __iter__(self):
+ return iter(self.keys())
+
+ def values(self):
+ 'See Interface.Common.Mapping.IEnumerableMapping'
+ result = []
+ get = self.get
+ for k in self.keys():
+ result.append(get(k))
+ return result
+
+ def __getitem__(self, key):
+ 'See Interface.Common.Mapping.IReadMapping'
+ result = self.get(key, _marker)
+ if result is _marker:
+ raise KeyError, key
+ else:
+ return result
+
+ def get(self, key, default=None):
+ 'See Interface.Common.Mapping.IReadMapping'
+
+ result = self._environ.get(key, self)
+ if result is not self: return result
+
+ return default
+
+ def __contains__(self, key):
+ 'See Interface.Common.Mapping.IReadMapping'
+ lookup = self.get(key, self)
+ return lookup is not self
+
+ has_key = __contains__
+
+ def _createResponse(self, outstream):
+ # Should be overridden by subclasses
+ return BaseResponse(outstream)
+
+ def __nonzero__(self):
+ # This is here to avoid calling __len__ for boolean tests
+ return 1
+
+ def __str__(self):
+ L1 = self.items()
+ L1.sort()
+ return "\n".join(map(lambda item: "%s:\t%s" % item, L1))
+
+ def _setupPath_helper(self, attr):
+ path = self.get(attr, "/").strip()
+ if path.endswith('/'):
+ # Remove trailing backslash, so that we will not get an empty
+ # last entry when splitting the path.
+ path = path[:-1]
+ self._endswithslash = True
+ else:
+ self._endswithslash = False
+
+ clean = []
+ for item in path.split('/'):
+ if not item or item == '.':
+ continue
+ elif item == '..':
+ try:
+ del clean[-1]
+ except IndexError:
+ raise NotFoundError('..')
+ else: clean.append(item)
+
+ clean.reverse()
+ self.setTraversalStack(clean)
+
+ self._path_suffix = None
+
+class TestRequest(BaseRequest):
+
+ __slots__ = ('_presentation_type', )
+
+ def __init__(self, path, body_instream=None, outstream=None, environ=None):
+ if environ is None:
+ environ = {}
+ environ['PATH_INFO'] = path
+ if body_instream is None:
+ body_instream = StringIO('')
+ if outstream is None:
+ outstream = StringIO()
+
+ super(TestRequest, self).__init__(body_instream, outstream, environ)
+
+
+class DefaultPublication(object):
+ """A stub publication.
+
+ This works just like Zope2's ZPublisher. It rejects any name
+ starting with an underscore and any objects (specifically: method)
+ that doesn't have a docstring.
+ """
+ implements(IPublication)
+
+ require_docstrings = True
+
+ def __init__(self, app):
+ self.app = app
+
+ def beforeTraversal(self, request):
+ # Lop off leading and trailing empty names
+ stack = request.getTraversalStack()
+ while stack and not stack[-1]:
+ stack.pop() # toss a trailing empty name
+ while stack and not stack[0]:
+ stack.pop(0) # toss a leading empty name
+ request.setTraversalStack(stack)
+
+ def getApplication(self, request):
+ return self.app
+
+ def callTraversalHooks(self, request, ob):
+ pass
+
+ def traverseName(self, request, ob, name, check_auth=1):
+ if name.startswith('_'):
+ raise Unauthorized("Name %s begins with an underscore" % `name`)
+ if hasattr(ob, name):
+ subob = getattr(ob, name)
+ else:
+ try:
+ subob = ob[name]
+ except (KeyError, IndexError,
+ TypeError, AttributeError):
+ raise NotFound(ob, name, request)
+ if self.require_docstrings and not getattr(subob, '__doc__', None):
+ raise DebugError(subob, 'Missing or empty doc string')
+ return subob
+
+ def getDefaultTraversal(self, request, ob):
+ return ob, ()
+
+ def afterTraversal(self, request, ob):
+ pass
+
+ def callObject(self, request, ob):
+ return mapply(ob, request.getPositionalArguments(), request)
+
+ def afterCall(self, request, ob):
+ pass
+
+ def handleException(self, object, request, exc_info, retry_allowed=1):
+ # Let the response handle it as best it can.
+ request.response.reset()
+ request.response.handleException(exc_info)
+
+
+class TestPublication(DefaultPublication):
+
+ def traverseName(self, request, ob, name, check_auth=1):
+ if hasattr(ob, name):
+ subob = getattr(ob, name)
+ else:
+ try:
+ subob = ob[name]
+ except (KeyError, IndexError,
+ TypeError, AttributeError):
+ raise NotFound(ob, name, request)
+ return subob
diff --git a/browser.py b/browser.py
new file mode 100644
index 0000000..3cde729
--- /dev/null
+++ b/browser.py
@@ -0,0 +1,786 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Browser-specific Publisher classes
+
+Here we define the specific 'BrowserRequest' and 'BrowserResponse' class. The
+big improvement of the 'BrowserRequest' to 'HTTPRequest' is that is can handle
+HTML form data and convert them into a Python-native format. Even file data is
+packaged into a nice, Python-friendly 'FileUpload' object.
+
+$Id: browser.py,v 1.31 2004/05/06 10:12:13 philikon Exp $
+"""
+import re
+from types import ListType, TupleType, StringType, StringTypes
+from cgi import FieldStorage, escape
+
+from zope.interface import implements
+from zope.i18n.interfaces import IUserPreferredLanguages
+from zope.i18n.interfaces import IUserPreferredCharsets
+from zope.publisher.interfaces.browser import IBrowserRequest
+from zope.publisher.interfaces.browser import IBrowserApplicationRequest
+
+from zope.publisher.http import HTTPRequest, HTTPResponse
+from zope.publisher.base import BaseRequest
+
+__metaclass__ = type # All classes are new style when run with Python 2.2+
+
+__ArrayTypes = (ListType, TupleType)
+
+start_of_header_search=re.compile('(<head[^>]*>)', re.I).search
+base_re_search=re.compile('(<base.*?>)',re.I).search
+isRelative = re.compile("[-_.!~*a-zA-z0-9'()@&=+$,]+(/|$)").match
+newlines = re.compile('\r\n|\n\r|\r')
+
+
+def is_text_html(content_type):
+ return content_type.startswith('text/html')
+
+# Flag Constants
+SEQUENCE = 1
+DEFAULT = 2
+RECORD = 4
+RECORDS = 8
+REC = RECORD | RECORDS
+CONVERTED = 32
+DEFAULTABLE_METHODS = 'GET', 'POST', 'HEAD'
+
+
+def field2string(v):
+ if hasattr(v, 'read'):
+ return v.read()
+ return str(v)
+
+def field2text(v, nl=newlines):
+ return nl.sub("\n", field2string(v))
+
+def field2required(v):
+ v = field2string(v)
+ if not v.strip():
+ raise ValueError, 'No input for required field<p>'
+ return v
+
+def field2int(v):
+ if isinstance(v, __ArrayTypes):
+ return map(field2int, v)
+ v = field2string(v)
+ if not v:
+ raise ValueError, 'Empty entry when <strong>integer</strong> expected'
+ try:
+ return int(v)
+ except ValueError:
+ raise ValueError, "An integer was expected in the value '%s'" % v
+
+def field2float(v):
+ if isinstance(v, __ArrayTypes):
+ return map(field2float, v)
+ v = field2string(v)
+ if not v:
+ raise ValueError, (
+ 'Empty entry when <strong>floating-point number</strong> expected')
+ try:
+ return float(v)
+ except ValueError:
+ raise ValueError, (
+ "A floating-point number was expected in the value '%s'" % v
+ )
+
+def field2long(v):
+ if isinstance(v, __ArrayTypes):
+ return map(field2long, v)
+ v = field2string(v)
+
+ # handle trailing 'L' if present.
+ if v and v[-1].upper() == 'L':
+ v = v[:-1]
+ if not v:
+ raise ValueError, 'Empty entry when <strong>integer</strong> expected'
+ try:
+ return long(v)
+ except ValueError:
+ raise ValueError, "A long integer was expected in the value '%s'" % v
+
+def field2tokens(v):
+ return field2string(v).split()
+
+def field2lines(v):
+ if isinstance(v, __ArrayTypes):
+ return [str(item) for item in v]
+ return field2text(v).splitlines()
+
+def field2boolean(v):
+ return bool(v)
+
+type_converters = {
+ 'float': field2float,
+ 'int': field2int,
+ 'long': field2long,
+ 'string': field2string,
+ 'required': field2required,
+ 'tokens': field2tokens,
+ 'lines': field2lines,
+ 'text': field2text,
+ 'boolean': field2boolean,
+ }
+
+get_converter = type_converters.get
+
+def registerTypeConverter(field_type, converter, replace=False):
+ """Add a custom type converter to the registry.
+
+ o If 'replace' is not true, raise a KeyError if a converter is
+ already registered for 'field_type'.
+ """
+ existing = type_converters.get(field_type)
+
+ if existing is not None and not replace:
+ raise KeyError, 'Existing converter for field_type: %s' % field_type
+
+ type_converters[field_type] = converter
+
+
+isCGI_NAME = {
+ # These fields are placed in request.environ instead of request.form.
+ 'SERVER_SOFTWARE' : 1,
+ 'SERVER_NAME' : 1,
+ 'GATEWAY_INTERFACE' : 1,
+ 'SERVER_PROTOCOL' : 1,
+ 'SERVER_PORT' : 1,
+ 'REQUEST_METHOD' : 1,
+ 'PATH_INFO' : 1,
+ 'PATH_TRANSLATED' : 1,
+ 'SCRIPT_NAME' : 1,
+ 'QUERY_STRING' : 1,
+ 'REMOTE_HOST' : 1,
+ 'REMOTE_ADDR' : 1,
+ 'AUTH_TYPE' : 1,
+ 'REMOTE_USER' : 1,
+ 'REMOTE_IDENT' : 1,
+ 'CONTENT_TYPE' : 1,
+ 'CONTENT_LENGTH' : 1,
+ 'SERVER_URL': 1,
+ }.has_key
+
+hide_key={
+ 'HTTP_AUTHORIZATION':1,
+ 'HTTP_CGI_AUTHORIZATION': 1,
+ }.has_key
+
+
+class Record:
+
+ def __getattr__(self, key, default=None):
+ if key in ('get', 'keys', 'items', 'values', 'copy',
+ 'has_key', '__contains__'):
+ return getattr(self.__dict__, key)
+ raise AttributeError, key
+
+ def __getitem__(self, key):
+ return self.__dict__[key]
+
+ def __str__(self):
+ L1 = self.__dict__.items()
+ L1.sort()
+ return ", ".join(["%s: %s" % item for item in L1])
+
+ def __repr__(self):
+ L1 = self.__dict__.items()
+ L1.sort()
+ return ', '.join(["%s: %s" % (key, repr(value)) for key, value in L1])
+
+
+class BrowserRequest(HTTPRequest):
+ implements(IBrowserRequest, IBrowserApplicationRequest)
+
+ __slots__ = (
+ 'form', # Form data
+ 'charsets', # helper attribute
+ '__meth',
+ '__tuple_items',
+ '__defaults',
+ )
+
+ # Set this to True in a subclass to redirect GET requests when the
+ # effective and actual URLs differ.
+ use_redirect = False
+
+ def __init__(self, body_instream, outstream, environ, response=None):
+ self.form = {}
+ self.charsets = None
+ super(BrowserRequest, self).__init__(
+ body_instream, outstream, environ, response)
+
+
+ def _createResponse(self, outstream):
+ # Should be overridden by subclasses
+ return BrowserResponse(outstream)
+
+ def _decode(self, text):
+ """Try to decode the text using one of the available charsets."""
+ if self.charsets is None:
+ envadapter = IUserPreferredCharsets(self)
+ self.charsets = envadapter.getPreferredCharsets()
+ for charset in self.charsets:
+ try:
+ text = unicode(text, charset)
+ break
+ except UnicodeError:
+ pass
+ return text
+
+ def processInputs(self):
+ 'See IPublisherRequest'
+
+ if self.method != 'GET':
+ # Process self.form if not a GET request.
+ fp = self._body_instream
+ else:
+ fp = None
+
+ fs = FieldStorage(fp=fp, environ=self._environ, keep_blank_values=1)
+
+ fslist = getattr(fs, 'list', None)
+ if fslist is not None:
+ self.__meth = None
+ self.__tuple_items = {}
+ self.__defaults = {}
+
+ # process all entries in the field storage (form)
+ for item in fslist:
+ self.__processItem(item)
+
+ if self.__defaults:
+ self.__insertDefaults()
+
+ if self.__tuple_items:
+ self.__convertToTuples()
+
+ if self.__meth:
+ self.setPathSuffix((self.__meth,))
+
+ _typeFormat = re.compile('([a-zA-Z][a-zA-Z0-9_]+|\\.[xy])$')
+
+ def __processItem(self, item):
+ """Process item in the field storage."""
+
+ # Check whether this field is a file upload object
+ # Note: A field exists for files, even if no filename was
+ # passed in and no data was uploaded. Therefore we can only
+ # tell by the empty filename that no upload was made.
+ key = item.name
+ if (hasattr(item, 'file') and hasattr(item, 'filename')
+ and hasattr(item,'headers')):
+ if (item.file and
+ (item.filename is not None and item.filename != ''
+ # RFC 1867 says that all fields get a content-type.
+ # or 'content-type' in map(lower, item.headers.keys())
+ )):
+ item = FileUpload(item)
+ else:
+ item = item.value
+
+ flags = 0
+ converter = None
+
+ # Loop through the different types and set
+ # the appropriate flags
+ # Syntax: var_name:type_name
+
+ # We'll search from the back to the front.
+ # We'll do the search in two steps. First, we'll
+ # do a string search, and then we'll check it with
+ # a re search.
+
+ while key:
+ pos = key.rfind(":")
+ if pos < 0:
+ break
+ match = self._typeFormat.match(key, pos + 1)
+ if match is None:
+ break
+
+ key, type_name = key[:pos], key[pos + 1:]
+
+ # find the right type converter
+ c = get_converter(type_name, None)
+
+ if c is not None:
+ converter = c
+ flags |= CONVERTED
+ elif type_name == 'list':
+ flags |= SEQUENCE
+ elif type_name == 'tuple':
+ self.__tuple_items[key] = 1
+ flags |= SEQUENCE
+ elif (type_name == 'method' or type_name == 'action'):
+ if key:
+ self.__meth = key
+ else:
+ self.__meth = item
+ elif (type_name == 'default_method'
+ or type_name == 'default_action') and not self.__meth:
+ if key:
+ self.__meth = key
+ else:
+ self.__meth = item
+ elif type_name == 'default':
+ flags |= DEFAULT
+ elif type_name == 'record':
+ flags |= RECORD
+ elif type_name == 'records':
+ flags |= RECORDS
+ elif type_name == 'ignore_empty' and not item:
+ # skip over empty fields
+ return
+
+ # Filter out special names from form:
+ if not (isCGI_NAME(key) or key.startswith('HTTP_')):
+ # Make it unicode
+ key = self._decode(key)
+ if type(item) == StringType:
+ item = self._decode(item)
+
+ if flags:
+ self.__setItemWithType(key, item, flags, converter)
+ else:
+ self.__setItemWithoutType(key, item)
+
+ def __setItemWithoutType(self, key, item):
+ """Set item value without explicit type."""
+ form = self.form
+ if key not in form:
+ form[key] = item
+ else:
+ found = form[key]
+ if isinstance(found, list):
+ found.append(item)
+ else:
+ form[key] = [found, item]
+
+ def __setItemWithType(self, key, item, flags, converter):
+ """Set item value with explicit type."""
+ #Split the key and its attribute
+ if flags & REC:
+ key, attr = self.__splitKey(key)
+
+ # defer conversion
+ if flags & CONVERTED:
+ try:
+ item = converter(item)
+ except:
+ if item or flags & DEFAULT or key not in self.__defaults:
+ raise
+ item = self.__defaults[key]
+ if flags & RECORD:
+ item = getattr(item, attr)
+ elif flags & RECORDS:
+ item = getattr(item[-1], attr)
+
+ # Determine which dictionary to use
+ if flags & DEFAULT:
+ form = self.__defaults
+ else:
+ form = self.form
+
+ # Insert in dictionary
+ if key not in form:
+ if flags & SEQUENCE:
+ item = [item]
+ if flags & RECORD:
+ r = form[key] = Record()
+ setattr(r, attr, item)
+ elif flags & RECORDS:
+ r = Record()
+ setattr(r, attr, item)
+ form[key] = [r]
+ else:
+ form[key] = item
+ else:
+ r = form[key]
+ if flags & RECORD:
+ if not flags & SEQUENCE:
+ setattr(r, attr, item)
+ else:
+ if not hasattr(r, attr):
+ setattr(r, attr, [item])
+ else:
+ getattr(r, attr).append(item)
+ elif flags & RECORDS:
+ last = r[-1]
+ if not hasattr(last, attr):
+ if flags & SEQUENCE:
+ item = [item]
+ setattr(last, attr, item)
+ else:
+ if flags & SEQUENCE:
+ getattr(last, attr).append(item)
+ else:
+ new = Record()
+ setattr(new, attr, item)
+ r.append(new)
+ else:
+ if isinstance(r, list):
+ r.append(item)
+ else:
+ form[key] = [r, item]
+
+ def __splitKey(self, key):
+ """Split the key and its attribute."""
+ i = key.rfind(".")
+ if i >= 0:
+ return key[:i], key[i + 1:]
+ return key, ""
+
+ def __convertToTuples(self):
+ """Convert form values to tuples."""
+ form = self.form
+
+ for key in self.__tuple_items:
+ if key in form:
+ form[key] = tuple(form[key])
+ else:
+ k, attr = self.__splitKey(key)
+
+ # remove any type_names in the attr
+ i = attr.find(":")
+ if i >= 0:
+ attr = attr[:i]
+
+ if k in form:
+ item = form[k]
+ if isinstance(item, Record):
+ if hasattr(item, attr):
+ setattr(item, attr, tuple(getattr(item, attr)))
+ else:
+ for v in item:
+ if hasattr(v, attr):
+ setattr(v, attr, tuple(getattr(v, attr)))
+
+ def __insertDefaults(self):
+ """Insert defaults into form dictionary."""
+ form = self.form
+
+ for keys, values in self.__defaults.iteritems():
+ if not keys in form:
+ form[keys] = values
+ else:
+ item = form[keys]
+ if isinstance(values, Record):
+ for k, v in values.items():
+ if not hasattr(item, k):
+ setattr(item, k, v)
+ elif isinstance(values, list):
+ for val in values:
+ if isinstance(val, Record):
+ for k, v in val.items():
+ for r in item:
+ if not hasattr(r, k):
+ setattr(r, k, v)
+ elif not val in item:
+ item.append(val)
+
+ def traverse(self, object):
+ 'See IPublisherRequest'
+
+ ob = super(BrowserRequest, self).traverse(object)
+ method = self.method
+
+ base_needed = 0
+ if self._path_suffix:
+ # We had a :method variable, so we need to set the base,
+ # but we don't look for default documents any more.
+ base_needed = 1
+ redirect = 0
+ elif method in DEFAULTABLE_METHODS:
+ # We need to check for default documents
+ publication = self.publication
+
+ nsteps = 0
+ ob, add_steps = publication.getDefaultTraversal(self, ob)
+ while add_steps:
+ nsteps += len(add_steps)
+ add_steps = list(add_steps)
+ add_steps.reverse()
+ self.setTraversalStack(add_steps)
+ ob = super(BrowserRequest, self).traverse(ob)
+ ob, add_steps = publication.getDefaultTraversal(self, ob)
+
+ if nsteps > self._endswithslash:
+ base_needed = 1
+ redirect = self.use_redirect and method == 'GET'
+
+
+ if base_needed:
+ url = self.getURL()
+ response = self.response
+ if redirect:
+ response.redirect(url)
+ return ''
+ elif not response.getBase():
+ response.setBase(url)
+
+ return ob
+
+ def keys(self):
+ 'See Interface.Common.Mapping.IEnumerableMapping'
+ d = {}
+ d.update(self._environ)
+ d.update(self._cookies)
+ d.update(self.form)
+ return d.keys()
+
+
+ def get(self, key, default=None):
+ 'See Interface.Common.Mapping.IReadMapping'
+
+ result = self.form.get(key, self)
+ if result is not self: return result
+
+ result = self._cookies.get(key, self)
+ if result is not self: return result
+
+ result = self._environ.get(key, self)
+ if result is not self: return result
+
+ return default
+
+
+class FileUpload(object):
+ '''File upload objects
+
+ File upload objects are used to represent file-uploaded data.
+
+ File upload objects can be used just like files.
+
+ In addition, they have a 'headers' attribute that is a dictionary
+ containing the file-upload headers, and a 'filename' attribute
+ containing the name of the uploaded file.
+ '''
+
+ def __init__(self, aFieldStorage):
+
+ file = aFieldStorage.file
+ if hasattr(file, '__methods__'):
+ methods = file.__methods__
+ else:
+ methods = ['close', 'fileno', 'flush', 'isatty',
+ 'read', 'readline', 'readlines', 'seek',
+ 'tell', 'truncate', 'write', 'writelines']
+
+ d = self.__dict__
+ for m in methods:
+ if hasattr(file,m):
+ d[m] = getattr(file,m)
+
+ self.headers = aFieldStorage.headers
+ self.filename = aFieldStorage.filename
+
+class RedirectingBrowserRequest(BrowserRequest):
+ """Browser requests that redirect when the actual and effective URLs differ
+ """
+
+ use_redirect = True
+
+class TestRequest(BrowserRequest):
+ """Browser request with a constructor convenient for testing
+ """
+
+ def __init__(self,
+ body_instream=None, outstream=None, environ=None, form=None,
+ skin='default',
+ **kw):
+
+ _testEnv = {
+ 'SERVER_URL': 'http://127.0.0.1',
+ 'HTTP_HOST': '127.0.0.1',
+ 'CONTENT_LENGTH': '0',
+ 'GATEWAY_INTERFACE': 'TestFooInterface/1.0',
+ }
+
+ if environ:
+ _testEnv.update(environ)
+ if kw:
+ _testEnv.update(kw)
+ if body_instream is None:
+ from StringIO import StringIO
+ body_instream = StringIO('')
+
+ if outstream is None:
+ from StringIO import StringIO
+ outstream = StringIO()
+
+ super(TestRequest, self).__init__(body_instream, outstream, _testEnv)
+ if form:
+ self.form.update(form)
+
+ self.setPresentationSkin(skin)
+
+ def setPrincipal(self, principal):
+ # HTTPRequest needs to notify the HTTPTask of the username.
+ # We don't want to have to stub HTTPTask in the tests.
+ BaseRequest.setPrincipal(self, principal)
+
+class BrowserResponse(HTTPResponse):
+ """Browser response
+ """
+
+ __slots__ = (
+ '_base', # The base href
+ )
+
+ def setBody(self, body):
+ """Sets the body of the response
+
+ Sets the return body equal to the (string) argument "body". Also
+ updates the "content-length" return header and sets the status to
+ 200 if it has not already been set.
+ """
+ if not isinstance(body, StringTypes):
+ body = unicode(body)
+
+ if 'content-type' not in self._headers:
+ c = (self.__isHTML(body) and 'text/html' or 'text/plain')
+ if self._charset is not None:
+ c += ';charset=' + self._charset
+ self.setHeader('content-type', c)
+
+ body = self.__insertBase(body)
+ self._body = body
+ self._updateContentLength()
+ if not self._status_set:
+ self.setStatus(200)
+
+ def __isHTML(self, str):
+ s = str.strip().lower()
+ return ((s.startswith('<html') and (s[5:6] in ' >'))
+ or s.startswith('<!doctype html'))
+
+
+ def __wrapInHTML(self, title, content):
+ t = escape(title)
+ return (
+ "<html><head><title>%s</title></head>\n"
+ "<body><h2>%s</h2>\n"
+ "%s\n"
+ "</body></html>\n" %
+ (t, t, content)
+ )
+
+
+ def __insertBase(self, body):
+ # Only insert a base tag if content appears to be html.
+ content_type = self.getHeader('content-type', '')
+ if content_type and not is_text_html(content_type):
+ return body
+
+ if getattr(self, '_base', ''):
+ if body:
+ match = start_of_header_search(body)
+ if match is not None:
+ index = match.start(0) + len(match.group(0))
+ ibase = base_re_search(body)
+ if ibase is None:
+ body = ('%s\n<base href="%s" />\n%s' %
+ (body[:index], self._base, body[index:]))
+ return body
+
+ def getBase(self):
+ return getattr(self, '_base', '')
+
+ def setBase(self, base):
+ self._base = base
+
+ def redirect(self, location, status=None):
+ base = getattr(self, '_base', '')
+ if base and isRelative(str(location)):
+ l = base.rfind('/')
+ if l >= 0:
+ base = base[:l+1]
+ else:
+ base += '/'
+ location = base + location
+
+ # XXX: HTTP redirects must provide an absolute location, see
+ # http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.30
+ # So, what if location is relative and base is unknown? Uncomment
+ # the following and you'll see that it actually happens.
+ #
+ # if isRelative(str(location)):
+ # raise AssertionError('Cannot determine absolute location')
+
+ return super(BrowserResponse, self).redirect(location, status)
+
+ def reset(self):
+ super(BrowserResponse, self).reset()
+ self._base = ''
+
+def normalize_lang(lang):
+ lang = lang.strip().lower()
+ lang = lang.replace('_', '-')
+ lang = lang.replace(' ', '')
+ return lang
+
+class BrowserLanguages:
+
+ implements(IUserPreferredLanguages)
+
+ def __init__(self, request):
+ self.request = request
+
+ def getPreferredLanguages(self):
+ '''See interface IUserPreferredLanguages'''
+ request = self.request
+ accept_langs = request.get('HTTP_ACCEPT_LANGUAGE', '').split(',')
+
+ # Normalize lang strings
+ accept_langs = map(normalize_lang, accept_langs)
+ # Then filter out empty ones
+ accept_langs = filter(None, accept_langs)
+
+ length = len(accept_langs)
+ accepts = []
+
+ for index, lang in enumerate(accept_langs):
+ l = lang.split(';', 2)
+
+ quality = None
+
+ if len(l) == 2:
+ q = l[1]
+ if q.startswith('q='):
+ q = q.split('=', 2)[1]
+ quality = float(q)
+ else:
+ # If not supplied, quality defaults to 1
+ quality = 1.0
+
+ if quality == 1.0:
+ # ... but we use 1.9 - 0.001 * position to
+ # keep the ordering between all items with
+ # 1.0 quality, which may include items with no quality
+ # defined, and items with quality defined as 1.
+ quality = 1.9 - (0.001 * index)
+
+ accepts.append((quality, l[0]))
+
+ # Filter langs with q=0, which means
+ # unwanted lang according to the spec
+ # See: http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.4
+ accepts = filter(lambda acc: acc[0], accepts)
+
+ accepts.sort()
+ accepts.reverse()
+
+ return [lang for quality, lang in accepts]
+
+
diff --git a/http.py b/http.py
new file mode 100644
index 0000000..059657e
--- /dev/null
+++ b/http.py
@@ -0,0 +1,1029 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""HTTP Publisher
+
+$Id: http.py,v 1.53 2004/05/06 10:12:13 philikon Exp $
+"""
+import re, time, random
+from urllib import quote, unquote, splitport
+from types import StringTypes, ClassType
+from cgi import escape
+
+from zope.interface import implements
+
+from zope.publisher.interfaces.http import IHTTPCredentials
+from zope.publisher.interfaces.http import IHTTPRequest
+from zope.publisher.interfaces.http import IHTTPApplicationRequest
+from zope.publisher.interfaces.http import IHTTPPublisher
+
+from zope.publisher.interfaces import Redirect
+from zope.publisher.interfaces.http import IHTTPResponse
+from zope.publisher.interfaces.http import IHTTPApplicationResponse
+from zope.publisher.interfaces.logginginfo import ILoggingInfo
+from zope.i18n.interfaces import IUserPreferredCharsets
+from zope.i18n.locales import locales, LoadLocaleError
+
+from zope.publisher.base import BaseRequest, BaseResponse
+from zope.publisher.base import RequestDataProperty, RequestDataMapper
+from zope.publisher.base import RequestDataGetter
+
+
+# Default Encoding
+ENCODING = 'UTF-8'
+
+class CookieMapper(RequestDataMapper):
+ _mapname = '_cookies'
+
+class HeaderGetter(RequestDataGetter):
+ _gettrname = 'getHeader'
+
+base64 = None
+
+def sane_environment(env):
+ # return an environment mapping which has been cleaned of
+ # funny business such as REDIRECT_ prefixes added by Apache
+ # or HTTP_CGI_AUTHORIZATION hacks.
+ dict = {}
+ for key, val in env.items():
+ while key.startswith('REDIRECT_'):
+ key = key[9:]
+ dict[key] = val
+ if 'HTTP_CGI_AUTHORIZATION' in dict:
+ dict['HTTP_AUTHORIZATION'] = dict.pop('HTTP_CGI_AUTHORIZATION')
+ return dict
+
+# Possible HTTP status responses
+status_reasons = {
+100: 'Continue',
+101: 'Switching Protocols',
+102: 'Processing',
+200: 'OK',
+201: 'Created',
+202: 'Accepted',
+203: 'Non-Authoritative Information',
+204: 'No Content',
+205: 'Reset Content',
+206: 'Partial Content',
+207: 'Multi-Status',
+300: 'Multiple Choices',
+301: 'Moved Permanently',
+302: 'Moved Temporarily',
+303: 'See Other',
+304: 'Not Modified',
+305: 'Use Proxy',
+307: 'Temporary Redirect',
+400: 'Bad Request',
+401: 'Unauthorized',
+402: 'Payment Required',
+403: 'Forbidden',
+404: 'Not Found',
+405: 'Method Not Allowed',
+406: 'Not Acceptable',
+407: 'Proxy Authentication Required',
+408: 'Request Time-out',
+409: 'Conflict',
+410: 'Gone',
+411: 'Length Required',
+412: 'Precondition Failed',
+413: 'Request Entity Too Large',
+414: 'Request-URI Too Large',
+415: 'Unsupported Media Type',
+416: 'Requested range not satisfiable',
+417: 'Expectation Failed',
+422: 'Unprocessable Entity',
+423: 'Locked',
+424: 'Failed Dependency',
+500: 'Internal Server Error',
+501: 'Not Implemented',
+502: 'Bad Gateway',
+503: 'Service Unavailable',
+504: 'Gateway Time-out',
+505: 'HTTP Version not supported',
+507: 'Insufficient Storage',
+}
+
+status_codes={}
+
+def init_status_codes():
+ # Add mappings for builtin exceptions and
+ # provide text -> error code lookups.
+ for key, val in status_reasons.items():
+ status_codes[val.replace(' ', '').lower()] = key
+ status_codes[val.lower()] = key
+ status_codes[key] = key
+ status_codes[str(key)] = key
+
+ en = [n.lower() for n in dir(__builtins__) if n.endswith('Error')]
+
+ for name in en:
+ status_codes[name] = 500
+
+init_status_codes()
+
+
+class URLGetter(object):
+
+ __slots__ = "__request"
+
+ def __init__(self, request):
+ self.__request = request
+
+ def __str__(self):
+ return self.__request.getURL()
+
+ def __getitem__(self, name):
+ url = self.get(name, None)
+ if url is None:
+ raise KeyError, name
+ return url
+
+ def get(self, name, default=None):
+ i = int(name)
+ try:
+ if i < 0:
+ i = -i
+ return self.__request.getURL(i)
+ else:
+ return self.__request.getApplicationURL(i)
+ except IndexError, v:
+ if v[0] == i:
+ return default
+ raise
+
+DEFAULT_PORTS = {'http': '80', 'https': '443'}
+STAGGER_RETRIES = True
+
+class HTTPRequest(BaseRequest):
+ """Model HTTP request data.
+
+ This object provides access to request data. This includes, the
+ input headers, form data, server data, and cookies.
+
+ Request objects are created by the object publisher and will be
+ passed to published objects through the argument name, REQUEST.
+
+ The request object is a mapping object that represents a
+ collection of variable to value mappings. In addition, variables
+ are divided into four categories:
+
+ - Environment variables
+
+ These variables include input headers, server data, and other
+ request-related data. The variable names are as <a
+ href="http://hoohoo.ncsa.uiuc.edu/cgi/env.html">specified</a>
+ in the <a
+ href="http://hoohoo.ncsa.uiuc.edu/cgi/interface.html">CGI
+ specification</a>
+
+ - Form data
+
+ These are data extracted from either a URL-encoded query
+ string or body, if present.
+
+ - Cookies
+
+ These are the cookie data, if present.
+
+ - Other
+
+ Data that may be set by an application object.
+
+ The form attribute of a request is actually a Field Storage
+ object. When file uploads are used, this provides a richer and
+ more complex interface than is provided by accessing form data as
+ items of the request. See the FieldStorage class documentation
+ for more details.
+
+ The request object may be used as a mapping object, in which case
+ values will be looked up in the order: environment variables,
+ other variables, form data, and then cookies.
+ """
+ implements(IHTTPCredentials, IHTTPRequest, IHTTPApplicationRequest)
+
+ __slots__ = (
+ '_auth', # The value of the HTTP_AUTHORIZATION header.
+ '_cookies', # The request cookies
+ '_path_suffix', # Extra traversal steps after normal traversal
+ '_retry_count', # How many times the request has been retried
+ '_app_names', # The application path as a sequence
+ '_app_server', # The server path of the application url
+ '_orig_env', # The original environment
+ '_endswithslash', # Does the given path end with /
+ 'method', # The upper-cased request method (REQUEST_METHOD)
+ '_locale', # The locale for the request
+ '_vh_root', # Object at the root of the virtual host
+ )
+
+ retry_max_count = 3 # How many times we're willing to retry
+
+ def __init__(self, body_instream, outstream, environ, response=None):
+ super(HTTPRequest, self).__init__(
+ body_instream, outstream, environ, response)
+
+ self._orig_env = environ
+ environ = sane_environment(environ)
+
+ if 'HTTP_AUTHORIZATION' in environ:
+ self._auth = environ['HTTP_AUTHORIZATION']
+ del environ['HTTP_AUTHORIZATION']
+ else:
+ self._auth = None
+
+ self.method = environ.get("REQUEST_METHOD", 'GET').upper()
+
+ self._environ = environ
+
+ self.__setupCookies()
+ self.__setupPath()
+ self.__setupURLBase()
+ self._vh_root = None
+ self.__setupLocale()
+
+ def __setupLocale(self):
+ # Import here to break import loops
+ from zope.publisher.browser import BrowserLanguages
+
+ self.response.setCharsetUsingRequest(self)
+ langs = BrowserLanguages(self).getPreferredLanguages()
+ for httplang in langs:
+ parts = (httplang.split('-') + [None, None])[:3]
+ try:
+ self._locale = locales.getLocale(*parts)
+ return
+ except LoadLocaleError:
+ # Just try the next combination
+ pass
+ else:
+ # No combination gave us an existing locale, so use the default,
+ # which is guaranteed to exist
+ self._locale = locales.getLocale(None, None, None)
+
+ def _getLocale(self):
+ return self._locale
+ locale = property(_getLocale)
+
+ def __setupURLBase(self):
+ get_env = self._environ.get
+ # Get base info first. This isn't likely to cause
+ # errors and might be useful to error handlers.
+ script = get_env('SCRIPT_NAME', '').strip()
+
+ # _script and the other _names are meant for URL construction
+ self._app_names = filter(None, script.split('/'))
+
+ # get server URL and store it too, since we are already looking it up
+ server_url = get_env('SERVER_URL', None)
+ if server_url is not None:
+ self._app_server = server_url = server_url.strip()
+ else:
+ server_url = self.__deduceServerURL()
+
+ if server_url.endswith('/'):
+ server_url = server_url[:-1]
+
+ # strip off leading /'s of script
+ while script.startswith('/'):
+ script = script[1:]
+
+ self._app_server = server_url
+
+ def __deduceServerURL(self):
+ environ = self._environ
+
+ if (environ.get('HTTPS', '').lower() == "on" or
+ environ.get('SERVER_PORT_SECURE') == "1"):
+ protocol = 'https'
+ else:
+ protocol = 'http'
+
+ if environ.has_key('HTTP_HOST'):
+ host = environ['HTTP_HOST'].strip()
+ hostname, port = splitport(host)
+ else:
+ hostname = environ.get('SERVER_NAME', '').strip()
+ port = environ.get('SERVER_PORT', '')
+
+ if port and port != DEFAULT_PORTS.get(protocol):
+ host = hostname + ':' + port
+ else:
+ host = hostname
+
+ return '%s://%s' % (protocol, host)
+
+ _cookieFormat = re.compile('[\x00- ]*'
+ # Cookie name
+ '([^\x00- ;,="]+)='
+ # Cookie value (either correct quoted or MSIE)
+ '(?:"([^"]*)"|([^\x00- ;,"]*))'
+ '(?:[\x00- ]*[;,])?[\x00- ]*')
+
+ def _parseCookies(self, text, result=None):
+ """Parse 'text' and return found cookies as 'result' dictionary."""
+
+ if result is None:
+ result = {}
+
+ cookieFormat = self._cookieFormat
+
+ pos = 0
+ ln = len(text)
+ while pos < ln:
+ match = cookieFormat.match(text, pos)
+ if match is None:
+ break
+
+ name = unicode(match.group(1), ENCODING)
+ if name not in result:
+ value, ms_value = match.group(2, 3)
+ if value is None:
+ value = ms_value
+ result[name] = unicode(value, ENCODING)
+
+ pos = match.end()
+
+ return result
+
+ def __setupCookies(self):
+ # Cookie values should *not* be appended to existing form
+ # vars with the same name - they are more like default values
+ # for names not otherwise specified in the form.
+ self._cookies = {}
+ cookie_header = self._environ.get('HTTP_COOKIE', None)
+ if cookie_header is not None:
+ self._parseCookies(cookie_header, self._cookies)
+
+ def __setupPath(self):
+ # The recommendation states that:
+ #
+ # Unless there is some compelling reason for a
+ # particular scheme to do otherwise, translating character sequences
+ # into UTF-8 (RFC 2279) [3] and then subsequently using the %HH
+ # encoding for unsafe octets is recommended.
+ #
+ # See: http://www.ietf.org/rfc/rfc2718.txt, Section 2.2.5
+ self._setupPath_helper("PATH_INFO")
+ stack = self.getTraversalStack()
+ stack = [unquote(seg).decode('utf-8') for seg in stack]
+ self.setTraversalStack(stack)
+
+ def supportsRetry(self):
+ 'See IPublisherRequest'
+ count = getattr(self, '_retry_count', 0)
+ if count < self.retry_max_count:
+ if STAGGER_RETRIES:
+ time.sleep(random.uniform(0, 2**(count)))
+ return True
+
+ def retry(self):
+ 'See IPublisherRequest'
+ count = getattr(self, '_retry_count', 0)
+ self._retry_count = count + 1
+ self._body_instream.seek(0)
+ new_response = self.response.retry()
+ request = self.__class__(
+ body_instream=self._body_instream,
+ outstream=None,
+ environ=self._orig_env,
+ response=new_response,
+ )
+ request.setPublication(self.publication)
+ request._retry_count = self._retry_count
+ return request
+
+ def traverse(self, object):
+ 'See IPublisherRequest'
+
+ ob = super(HTTPRequest, self).traverse(object)
+ if self._path_suffix:
+ self._traversal_stack = self._path_suffix
+ ob = super(HTTPRequest, self).traverse(ob)
+
+ return ob
+
+ def getHeader(self, name, default=None, literal=False):
+ 'See IHTTPRequest'
+ environ = self._environ
+ if not literal:
+ name = name.replace('-', '_').upper()
+ val = environ.get(name, None)
+ if val is not None:
+ return val
+ if not name.startswith('HTTP_'):
+ name='HTTP_%s' % name
+ return environ.get(name, default)
+
+ headers = RequestDataProperty(HeaderGetter)
+
+ def getCookies(self):
+ 'See IHTTPRequest'
+ return self._cookies
+
+ cookies = RequestDataProperty(CookieMapper)
+
+ def setPathSuffix(self, steps):
+ 'See IHTTPRequest'
+ steps = list(steps)
+ steps.reverse()
+ self._path_suffix = steps
+
+ def _authUserPW(self):
+ 'See IHTTPCredentials'
+ global base64
+ if self._auth:
+ if self._auth.lower().startswith('basic '):
+ if base64 is None:
+ import base64
+ name, password = base64.decodestring(
+ self._auth.split()[-1]).split(':')
+ return name, password
+
+ def unauthorized(self, challenge):
+ 'See IHTTPCredentials'
+ self._response.setHeader("WWW-Authenticate", challenge, True)
+ self._response.setStatus(401)
+
+ def setPrincipal(self, principal):
+ 'See IPublicationRequest'
+ super(HTTPRequest, self).setPrincipal(principal)
+
+ if self.response.http_transaction is not None:
+ logging_info = ILoggingInfo(principal)
+ message = logging_info.getLogMessage()
+ self.response.http_transaction.setAuthUserName(message)
+
+ def _createResponse(self, outstream):
+ # Should be overridden by subclasses
+ return HTTPResponse(outstream)
+
+
+ def getURL(self, level=0, path_only=False):
+ names = self._app_names + self._traversed_names
+ if level:
+ if level > len(names):
+ raise IndexError, level
+ names = names[:-level]
+ # See: http://www.ietf.org/rfc/rfc2718.txt, Section 2.2.5
+ names = [quote(name.encode("utf-8"), safe='/+@') for name in names]
+
+ if path_only:
+ if not names:
+ return '/'
+ return '/' + '/'.join(names)
+ else:
+ if not names:
+ return self._app_server
+ return "%s/%s" % (self._app_server, '/'.join(names))
+
+ def getApplicationURL(self, depth=0, path_only=False):
+ """See IHTTPApplicationRequest"""
+ if depth:
+ names = self._traversed_names
+ if depth > len(names):
+ raise IndexError, depth
+ names = self._app_names + names[:depth]
+ else:
+ names = self._app_names
+
+ # See: http://www.ietf.org/rfc/rfc2718.txt, Section 2.2.5
+ names = [quote(name.encode("utf-8"), safe='/+@') for name in names]
+
+ if path_only:
+ return names and ('/' + '/'.join(names)) or '/'
+ else:
+ return (names and ("%s/%s" % (self._app_server, '/'.join(names)))
+ or self._app_server)
+
+ def setApplicationServer(self, host, proto='http', port=None):
+ if port and str(port) != DEFAULT_PORTS.get(proto):
+ host = '%s:%s' % (host, port)
+ self._app_server = '%s://%s' % (proto, host)
+
+ def shiftNameToApplication(self):
+ """Add the name being traversed to the application name
+
+ This is only allowed in the case where the name is the first name.
+
+ A Value error is raise if the shift can't be performed.
+ """
+ if len(self._traversed_names) == 1:
+ self._app_names.append(self._traversed_names.pop())
+ return
+
+ raise ValueError("Can only shift leading traversal "
+ "names to application names")
+
+ def setVirtualHostRoot(self, names=()):
+ del self._traversed_names[:]
+ self._vh_root = self._last_obj_traversed
+ self._app_names = list(names)
+
+ def getVirtualHostRoot(self):
+ return self._vh_root
+
+ URL = RequestDataProperty(URLGetter)
+
+ def __repr__(self):
+ # Returns a *short* string.
+ return '<%s.%s instance URL=%s>' % (
+ self.__class__.__module__, self.__class__.__name__, str(self.URL))
+
+ def get(self, key, default=None):
+ 'See Interface.Common.Mapping.IReadMapping'
+
+ result = self._cookies.get(key, self)
+ if result is not self: return result
+
+ result = self._environ.get(key, self)
+ if result is not self: return result
+
+ return default
+
+ def keys(self):
+ 'See Interface.Common.Mapping.IEnumerableMapping'
+ d = {}
+ d.update(self._environ)
+ d.update(self._cookies)
+ return d.keys()
+
+
+class HTTPResponse(BaseResponse):
+ implements(IHTTPResponse, IHTTPApplicationResponse)
+
+ __slots__ = (
+ '_header_output', # Hook object to collaborate with a server
+ # for header generation.
+ '_headers',
+ '_cookies',
+ '_accumulated_headers', # Headers that can have multiples
+ '_wrote_headers',
+ '_streaming',
+ '_status', # The response status (usually an integer)
+ '_reason', # The reason that goes with the status
+ '_status_set', # Boolean: status explicitly set
+ '_charset', # String: character set for the output
+ 'http_transaction', # HTTPTask object
+ )
+
+
+ def __init__(self, outstream, header_output=None, http_transaction=None):
+ self._header_output = header_output
+ self.http_transaction = http_transaction
+
+ super(HTTPResponse, self).__init__(outstream)
+ self.reset()
+
+ def reset(self):
+ 'See IResponse'
+ super(HTTPResponse, self).reset()
+ self._headers = {}
+ self._cookies = {}
+ self._accumulated_headers = []
+ self._wrote_headers = False
+ self._streaming = False
+ self._status = 599
+ self._reason = 'No status set'
+ self._status_set = False
+ self._charset = None
+
+ def setHeaderOutput(self, header_output):
+ self._header_output = header_output
+
+ def setHTTPTransaction(self, http_transaction):
+ self.http_transaction = http_transaction
+
+ def setStatus(self, status, reason=None):
+ 'See IHTTPResponse'
+ if status is None:
+ status = 200
+ else:
+ if type(status) in StringTypes:
+ status = status.lower()
+ if status in status_codes:
+ status = status_codes[status]
+ else:
+ status = 500
+ self._status = status
+
+ if reason is None:
+ if status == 200:
+ reason = 'Ok'
+ elif status in status_reasons:
+ reason = status_reasons[status]
+ else:
+ reason = 'Unknown'
+ self._reason = reason
+ self._status_set = True
+
+
+ def getStatus(self):
+ 'See IHTTPResponse'
+ return self._status
+
+
+ def setHeader(self, name, value, literal=False):
+ 'See IHTTPResponse'
+
+ name = str(name)
+ value = str(value)
+
+ key = name.lower()
+ if key == 'set-cookie':
+ self.addHeader(name, value)
+ else:
+ name = literal and name or key
+ self._headers[name]=value
+
+
+ def addHeader(self, name, value):
+ 'See IHTTPResponse'
+ accum = self._accumulated_headers
+ accum.append('%s: %s' % (name, value))
+
+
+ def getHeader(self, name, default=None, literal=False):
+ 'See IHTTPResponse'
+ key = name.lower()
+ name = literal and name or key
+ return self._headers.get(name, default)
+
+ def getHeaders(self):
+ 'See IHTTPResponse'
+ result = {}
+ headers = self._headers
+
+ if (not self._streaming and not ('content-length' in headers)
+ and not ('transfer-encoding' in headers)):
+ self._updateContentLength()
+
+ result["X-Powered-By"] = "Zope (www.zope.org), Python (www.python.org)"
+
+ for key, val in headers.items():
+ if key.lower() == key:
+ # only change non-literal header names
+ key = key.capitalize()
+ start = 0
+ location = key.find('-', start)
+ while location >= start:
+ key = "%s-%s" % (key[:location],
+ key[location+1:].capitalize())
+ start = location + 1
+ location = key.find('-', start)
+ result[key] = val
+
+ return result
+
+
+ def appendToHeader(self, name, value, delimiter=','):
+ 'See IHTTPResponse'
+ headers = self._headers
+ if name in headers:
+ h = self._header[name]
+ h = "%s%s\r\n\t%s" % (h, delimiter, value)
+ else:
+ h = value
+ self.setHeader(name, h)
+
+
+ def appendToCookie(self, name, value):
+ 'See IHTTPResponse'
+ cookies = self._cookies
+ if name in cookies:
+ cookie = cookies[name]
+ else:
+ cookie = cookies[name] = {}
+ if 'value' in cookie:
+ cookie['value'] = '%s:%s' % (cookie['value'], value)
+ else:
+ cookie['value'] = value
+
+
+ def expireCookie(self, name, **kw):
+ 'See IHTTPResponse'
+ dict = {'max_age':0, 'expires':'Wed, 31-Dec-97 23:59:59 GMT'}
+ for k, v in kw.items():
+ if v is not None:
+ dict[k] = v
+ cookies = self._cookies
+ if name in cookies:
+ # Cancel previous setCookie().
+ del cookies[name]
+ self.setCookie(name, 'deleted', **dict)
+
+
+ def setCookie(self, name, value, **kw):
+ 'See IHTTPResponse'
+ cookies = self._cookies
+ cookie = cookies.setdefault(name, {})
+
+ for k, v in kw.items():
+ if v is not None:
+ cookie[k] = v
+
+ cookie['value'] = value
+
+
+ def getCookie(self, name, default=None):
+ 'See IHTTPResponse'
+ return self._cookies.get(name, default)
+
+
+ def setCharset(self, charset=None):
+ 'See IHTTPResponse'
+ self._charset = charset
+
+ def _updateContentType(self):
+ if self._charset is not None:
+ ctype = self.getHeader('content-type', '')
+ if ctype.startswith("text") and "charset" not in ctype:
+ self.setHeader('content-type',
+ ctype + ";charset=" + self._charset)
+
+ def setCharsetUsingRequest(self, request):
+ 'See IHTTPResponse'
+ envadapter = IUserPreferredCharsets(request, None)
+ if envadapter is None:
+ return
+
+ try:
+ charset = envadapter.getPreferredCharsets()[0]
+ except IndexError:
+ # Exception caused by empty list! This is okay though, since the
+ # browser just could have sent a '*', which means we can choose
+ # the encoding, which we do here now.
+ charset = 'utf-8'
+ self.setCharset(charset)
+
+ def setBody(self, body):
+ self._body = unicode(body)
+ if not self._status_set:
+ self.setStatus(200)
+
+ def handleException(self, exc_info):
+ """
+ Calls self.setBody() with an error response.
+ """
+ t, v = exc_info[:2]
+ if isinstance(t, ClassType):
+ title = tname = t.__name__
+ if issubclass(t, Redirect):
+ self.redirect(v.getLocation())
+ return
+ else:
+ title = tname = unicode(t)
+
+ # Throwing non-protocol-specific exceptions is a good way
+ # for apps to control the status code.
+ self.setStatus(tname)
+
+ body = self._html(title, "A server error occurred." )
+ self.setBody(body)
+
+
+ def internalError(self):
+ 'See IPublisherResponse'
+ self.setStatus(500, u"The engines can't take any more, Jim!")
+
+
+ def _html(self, title, content):
+ t = escape(title)
+ return (
+ u"<html><head><title>%s</title></head>\n"
+ u"<body><h2>%s</h2>\n"
+ u"%s\n"
+ u"</body></html>\n" %
+ (t, t, content)
+ )
+
+
+ def retry(self):
+ """
+ Returns a response object to be used in a retry attempt
+ """
+ return self.__class__(self._outstream,
+ self._header_output)
+
+ def _updateContentLength(self, data=None):
+ if data is None:
+ blen = str(len(self._body))
+ else:
+ blen = str(len(data))
+ if blen.endswith('L'):
+ blen = blen[:-1]
+ self.setHeader('content-length', blen)
+
+ def redirect(self, location, status=None):
+ """Causes a redirection without raising an error"""
+ if status is None:
+ # parse the HTTP version and set default accordingly
+ if (self._request.get("SERVER_PROTOCOL","HTTP/1.0") <
+ "HTTP/1.1"):
+ status=302
+ else:
+ status=303
+
+ self.setStatus(status)
+ self.setHeader('Location', location)
+ return location
+
+ def _cookie_list(self):
+ cookie_list = []
+ for name, attrs in self._cookies.items():
+
+ # Note that as of May 98, IE4 ignores cookies with
+ # quoted cookie attr values, so only the value part
+ # of name=value pairs may be quoted.
+
+ cookie='Set-Cookie: %s="%s"' % (name, attrs['value'])
+ for name, value in attrs.items():
+ name = name.lower()
+ if name == 'expires':
+ cookie = '%s; Expires=%s' % (cookie,value)
+ elif name == 'domain':
+ cookie = '%s; Domain=%s' % (cookie,value)
+ elif name == 'path':
+ cookie = '%s; Path=%s' % (cookie,value)
+ elif name == 'max_age':
+ cookie = '%s; Max-Age=%s' % (cookie,value)
+ elif name == 'comment':
+ cookie = '%s; Comment=%s' % (cookie,value)
+ elif name == 'secure' and value:
+ cookie = '%s; Secure' % cookie
+ cookie_list.append(cookie)
+
+ # XXX: Should really check size of cookies here!
+
+ return cookie_list
+
+
+ def getHeaderText(self, m):
+ lst = ['Status: %s %s' % (self._status, self._reason)]
+ items = m.items()
+ items.sort()
+ lst.extend(['%s: %s' % i for i in items])
+ lst.extend(self._cookie_list())
+ lst.extend(self._accumulated_headers)
+ return ('%s\r\n\r\n' % '\r\n'.join(lst))
+
+
+ def outputHeaders(self):
+ """This method outputs all headers.
+ Since it is a final output method, it must take care of all possible
+ unicode strings and encode them!
+ """
+ if self._charset is None:
+ self.setCharset('utf-8')
+ self._updateContentType()
+ encode = self._encode
+ headers = self.getHeaders()
+ # Clean these headers from unicode by possibly encoding them
+ headers = dict([(encode(key), encode(val))
+ for key, val in headers.iteritems()])
+ # Cleaning done.
+ header_output = self._header_output
+ if header_output is not None:
+ # Use the IHeaderOutput interface.
+ header_output.setResponseStatus(self._status, encode(self._reason))
+ header_output.setResponseHeaders(headers)
+ cookie_list = map(encode, self._cookie_list())
+ header_output.appendResponseHeaders(cookie_list)
+ accumulated_headers = map(encode, self._accumulated_headers)
+ header_output.appendResponseHeaders(accumulated_headers)
+ else:
+ # Write directly to outstream.
+ headers_text = self.getHeaderText(headers)
+ self._outstream.write(encode(headers_text))
+
+ def write(self, string):
+ """See IApplicationResponse
+
+ Return data as a stream
+
+ HTML data may be returned using a stream-oriented interface.
+ This allows the browser to display partial results while
+ computation of a response to proceed.
+
+ The published object should first set any output headers or
+ cookies on the response object and encode the string into
+ appropriate encoding.
+
+ Note that published objects must not generate any errors
+ after beginning stream-oriented output.
+
+ """
+ self.output(string)
+
+ def output(self, data):
+ """Output the data to the world. There are a couple of steps we have
+ to do:
+
+ 1. Check that there is a character encoding for the data. If not,
+ choose UTF-8. Note that if the charset is None, this is a sign of a
+ bug! The method setCharsetUsingRequest() specifically sets the
+ encoding to UTF-8, if none was found in the HTTP header. This
+ method should always be called when reading the HTTP request.
+
+ 2. Now that the encoding has been finalized, we can output the
+ headers.
+
+ 3. If the content type is text-based, let's encode the data and send
+ it also out the door.
+ """
+ if self._charset is None:
+ self.setCharset('utf-8')
+
+ if self.getHeader('content-type', '').startswith('text'):
+ data = self._encode(data)
+ self._updateContentLength(data)
+
+ if not self._wrote_headers:
+ self.outputHeaders()
+ self._wrote_headers = True
+
+ self._outstream.write(data)
+
+
+ def outputBody(self):
+ """Outputs the response body."""
+ self.output(self._body)
+
+
+ def _encode(self, text):
+ # Any method that calls this method has the responsibility to set
+ # the _charset variable (if None) to a non-None value (usually UTF-8)
+ if isinstance(text, unicode):
+ return text.encode(self._charset)
+ return text
+
+
+class DefaultPublisher:
+ implements(IHTTPPublisher)
+
+ def publishTraverse(self, request, name):
+ 'See IHTTPPublisher'
+
+ return getattr(self, name)
+
+def sort_charsets(x, y):
+ if y[1] == 'utf-8':
+ return 1
+ if x[1] == 'utf-8':
+ return -1
+ return cmp(y, x)
+
+
+class HTTPCharsets:
+ implements(IUserPreferredCharsets)
+
+ def __init__(self, request):
+ self.request = request
+
+ def getPreferredCharsets(self):
+ '''See interface IUserPreferredCharsets'''
+ charsets = []
+ sawstar = sawiso88591 = 0
+ for charset in self.request.get('HTTP_ACCEPT_CHARSET', '').split(','):
+ charset = charset.strip().lower()
+ if charset:
+ if ';' in charset:
+ charset, quality = charset.split(';')
+ if not quality.startswith('q='):
+ # not a quality parameter
+ quality = 1.0
+ else:
+ try:
+ quality = float(quality[2:])
+ except ValueError:
+ continue
+ else:
+ quality = 1.0
+ if quality == 0.0:
+ continue
+ if charset == '*':
+ sawstar = 1
+ if charset == 'iso-8859-1':
+ sawiso88591 = 1
+ charsets.append((quality, charset))
+ # Quoting RFC 2616, $14.2: If no "*" is present in an Accept-Charset
+ # field, then all character sets not explicitly mentioned get a
+ # quality value of 0, except for ISO-8859-1, which gets a quality
+ # value of 1 if not explicitly mentioned.
+ if not sawstar and not sawiso88591:
+ charsets.append((1.0, 'iso-8859-1'))
+ # UTF-8 is **always** preferred over anything else.
+ # Reason: UTF-8 is not specific and can encode the entire unicode
+ # range , unlike many other encodings. Since Zope can easily use very
+ # different ranges, like providing a French-Chinese dictionary, it is
+ # always good to use UTF-8.
+ charsets.sort(sort_charsets)
+ return [c[1] for c in charsets]
diff --git a/interfaces/__init__.py b/interfaces/__init__.py
new file mode 100644
index 0000000..e5f92c1
--- /dev/null
+++ b/interfaces/__init__.py
@@ -0,0 +1,416 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Interfaces for the publisher.
+
+$Id: __init__.py,v 1.16 2004/03/20 13:38:16 philikon Exp $
+"""
+
+from zope.interface import Interface
+from zope.interface import Attribute
+from zope.exceptions import Unauthorized
+from zope.exceptions import NotFoundError, INotFoundError
+from zope.component.interfaces import IPresentationRequest
+from zope.interface import implements
+from zope.interface.common.mapping import IEnumerableMapping
+from zope.interface.common.interfaces import IException
+from zope.security.interfaces import IParticipation
+
+class IPublishingException(IException):
+ pass
+
+class PublishingException(Exception):
+ implements(IPublishingException)
+
+class ITraversalException(IPublishingException):
+ pass
+
+class TraversalException(PublishingException):
+ implements(ITraversalException)
+
+class INotFound(INotFoundError, ITraversalException):
+ def getObject():
+ 'Returns the object that was being traversed.'
+
+ def getName():
+ 'Returns the name that was being traversed.'
+
+class NotFound(NotFoundError, TraversalException):
+ implements(INotFound)
+
+ def __init__(self, ob, name, request=None):
+ self.ob = ob
+ self.name = name
+
+ def getObject(self):
+ return self.ob
+
+ def getName(self):
+ return self.name
+
+ def __str__(self):
+ try:
+ ob = `self.ob`
+ except:
+ ob = 'unprintable object'
+ return 'Object: %s, name: %s' % (ob, `self.name`)
+
+class IDebugError(ITraversalException):
+ def getObject():
+ 'Returns the object being traversed.'
+
+ def getMessage():
+ 'Returns the debug message.'
+
+class DebugError(TraversalException):
+ implements(IDebugError)
+
+ def __init__(self, ob, message):
+ self.ob = ob
+ self.message = message
+
+ def getObject(self):
+ return self.ob
+
+ def getMessage(self):
+ return self.message
+
+ def __str__(self):
+ return self.message
+
+class IBadRequest(IPublishingException):
+ def __str__():
+ 'Returns the error message.'
+
+class BadRequest(PublishingException):
+
+ implements(IBadRequest)
+
+ def __init__(self, message):
+ self.message = message
+
+ def __str__(self):
+ return self.message
+
+class IRedirect(IPublishingException):
+ def getLocation():
+ 'Returns the location.'
+
+class Redirect(PublishingException):
+
+ implements(IRedirect)
+
+ def __init__(self, location):
+ self.location = location
+
+ def getLocation(self):
+ return self.location
+
+ def __str__(self):
+ return 'Location: %s' % self.location
+
+class IRetry(IPublishingException):
+ def getOriginalException():
+ 'Returns the original exception object.'
+
+class Retry(PublishingException):
+ """Raise this to retry a request.
+ """
+
+ implements(IRetry)
+
+ def __init__(self, orig_exc=None):
+ self.orig_exc = orig_exc
+
+ def getOriginalException(self):
+ return self.orig_exc
+
+ def __str__(self):
+ return repr(self.orig_exc)
+
+
+class IExceptionSideEffects(Interface):
+ '''An exception caught by the publisher is adapted to this so that
+ it can have persistent side-effects.'''
+
+ def __call__(obj, request, exc_info):
+ '''Effect persistent side-effects.
+
+ Arguments are:
+ obj context-wrapped object that was published
+ request the request
+ exc_info the exception info being handled
+ '''
+
+
+class IPublishTraverse(Interface):
+
+ def publishTraverse(request, name):
+ """Lookup a name
+
+ The request argument is the publisher request object.
+ """
+
+
+class IPublisher(Interface):
+
+ def publish(request):
+ """Publish a request
+
+ The request must be an IPublisherRequest.
+ """
+
+
+class IPublisherResponse(Interface):
+ """Interface used by the publsher
+ """
+
+ def setBody(result):
+ """Sets the response result value.
+ """
+
+ def reset():
+ """Resets response state on exceptions.
+ """
+
+ def handleException(exc_info):
+ """Handles an otherwise unhandled exception.
+
+ The publication object gets the first chance to handle an exception,
+ and if it doesn't have a good way to do it, it defers to the
+ response. Implementations should set the reponse body.
+ """
+
+ def internalError():
+ """Called when the exception handler bombs.
+
+ Should report back to the client that an internal error occurred.
+ """
+
+ def outputBody():
+ """Outputs the response to the client
+ """
+
+ def retry():
+ """Returns a retry response
+
+ Returns a response suitable for repeating the publication attempt.
+ """
+
+
+class IPublication(Interface):
+ """Object publication framework.
+
+ The responsibility of publication objects is to provide
+ application hooks for the publishing process. This allows
+ application-specific tasks, such as connecting to databases,
+ managing transactions, and setting security contexts to be invoked
+ during the publishing process.
+ """
+ # The order of the hooks mostly corresponds with the order in which
+ # they are invoked.
+
+ def beforeTraversal(request):
+ """Pre-traversal hook.
+
+ This is called *once* before any traversal has been done.
+ """
+
+ def getApplication(request):
+ """Returns the object where traversal should commence.
+ """
+
+ def callTraversalHooks(request, ob):
+ """Invokes any traversal hooks associated with the object.
+ """
+
+ def traverseName(request, ob, name, check_auth=1):
+ """Traverses to the next object.
+
+ If check_auth is set,
+ performs idenitification, authentication, and authorization.
+ Returns the subobject.
+ """
+
+ def afterTraversal(request, ob):
+ """Post-traversal hook.
+ """
+
+ def callObject(request, ob):
+ """Call the object, returning the result.
+
+ For GET/POST this means calling it, but for other methods
+ (including those of WebDAV and FTP) this might mean invoking
+ a method of an adapter.
+ """
+
+ def afterCall(request, ob):
+ """Post-callObject hook (if it was successful).
+ """
+
+ def handleException(object, request, exc_info, retry_allowed=1):
+ """Handle an exception
+
+ Either:
+ - sets the body of the response, request.response, or
+ - raises a Retry exception, or
+ - throws another exception, which is a Bad Thing.
+
+ Note that this method should not leak, which means that
+ exc_info must be set to some other value before exiting the method.
+ """
+
+
+class IApplicationResponse(Interface):
+ """Features that support application logic
+ """
+
+ def write(string):
+ """Output a string to the response body.
+ """
+
+
+class IPublicationRequest(IPresentationRequest, IParticipation):
+ """Interface provided by requests to IPublication objects
+ """
+
+ response = Attribute("""The request's response object
+
+ Return an IPublisherResponse for the request.
+ """)
+
+ def close():
+ """Release resources held by the request.
+ """
+
+ def hold(object):
+ """Hold a reference to an object until the request is closed
+ """
+
+ def getTraversalStack():
+ """Return the request traversal stack
+
+ This is a sequence of steps to traverse in reverse order. They
+ will be traversed from last to first.
+ """
+
+ def setTraversalStack(stack):
+ """Change the traversal stack.
+
+ See getTraversalStack.
+ """
+
+ def getPositionalArguments():
+ """Return the positional arguments given to the request.
+ """
+
+ def setPresentationSkin(skin):
+ """Set the skin to be used for the request.
+
+ It's up to the publication object to decide this.
+ """
+
+ def setPrincipal(principal):
+ """Set the principal attribute.
+
+ It should be IPrincipal wrapped in it's AuthenticationService's context.
+ """
+
+
+class IPublisherRequest(IPublicationRequest):
+ """Request interface use by the publisher
+
+ The responsibility of requests is to encapsulate protocol
+ specific details, especially wrt request inputs.
+
+ Request objects also serve as "context" objects, providing
+ construction of and access to responses and storage of publication
+ objects.
+ """
+
+ def supportsRetry():
+ """Check whether the request supports retry
+
+ Return a boolean value indicating whether the request can be retried.
+ """
+
+ def retry():
+ """Return a retry request
+
+ Return a request suitable for repeating the publication attempt.
+ """
+
+ publication = Attribute("""The request's publication object
+
+ The publication object, an IRequestPublication provides
+ application-specific functionality hooks.
+ """)
+
+ def setPublication(publication):
+ """Set the request's publication object
+ """
+
+ def traverse(object):
+ """Traverse from the given object to the published object
+
+ The published object is returned.
+
+ The following hook methods on the publication will be called:
+
+ - callTraversalHooks is called before each step and after
+ the last step.
+
+ - traverseName to actually do a single traversal
+
+ """
+
+ def processInputs():
+ """Do any input processing that needs to be done before traversing
+
+ This is done after construction to allow the publisher to
+ handle errors that arise.
+ """
+
+
+class IApplicationRequest(IEnumerableMapping):
+ """Features that support application logic
+ """
+
+ principal = Attribute("""Principal object associated with the request
+ This is a read-only attribute.
+ """)
+
+ body = Attribute("""The body of the request as a string""")
+
+ bodyFile = Attribute("""The body of the request as a file""")
+
+ def __getitem__(key):
+ """Return request data
+
+ The only request data are environment variables.
+ """
+
+ environment = Attribute(
+ """Request environment data
+
+ This is a read-only mapping from variable name to value.
+ """)
+
+
+class IResponse(IPublisherResponse, IApplicationResponse):
+ """The basic response contract
+ """
+
+class IRequest(IPublisherRequest, IPublicationRequest, IApplicationRequest):
+ """The basic request contract
+ """
diff --git a/tests/test_http.py b/tests/test_http.py
new file mode 100644
index 0000000..ab6234f
--- /dev/null
+++ b/tests/test_http.py
@@ -0,0 +1,485 @@
+# -*- coding: latin-1 -*-
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""HTTP Publisher Tests
+
+$Id: test_http.py,v 1.33 2004/04/07 14:36:48 jim Exp $
+"""
+import unittest
+
+from zope.interface import implements
+from zope.publisher.interfaces.logginginfo import ILoggingInfo
+from zope.publisher.http import HTTPRequest, HTTPResponse
+from zope.publisher.publish import publish
+from zope.publisher.base import DefaultPublication
+from zope.publisher.interfaces.http import IHTTPRequest, IHTTPResponse
+from zope.publisher.interfaces.http import IHTTPApplicationResponse
+from zope.publisher.interfaces import IResponse
+
+from zope.i18n.interfaces.locales import ILocale
+
+from zope.interface.verify import verifyObject
+
+from StringIO import StringIO
+
+
+class UserStub:
+ implements(ILoggingInfo)
+
+ def __init__(self, id):
+ self._id = id
+
+ def getId(self):
+ return self._id
+
+ def getLogMessage(self):
+ return self._id
+
+
+class HTTPTests(unittest.TestCase):
+
+ _testEnv = {
+ 'PATH_INFO': '/folder/item',
+ 'a': '5',
+ 'b': 6,
+ 'SERVER_URL': 'http://foobar.com',
+ 'HTTP_HOST': 'foobar.com',
+ 'CONTENT_LENGTH': '0',
+ 'HTTP_AUTHORIZATION': 'Should be in accessible',
+ 'GATEWAY_INTERFACE': 'TestFooInterface/1.0',
+ 'HTTP_OFF_THE_WALL': "Spam 'n eggs",
+ 'HTTP_ACCEPT_CHARSET': 'ISO-8859-1, UTF-8;q=0.66, UTF-16;q=0.33',
+ }
+
+ def setUp(self):
+ class AppRoot:
+ " "
+
+ class Folder:
+ " "
+
+ class Item:
+ " "
+ def __call__(self, a, b):
+ return "%s, %s" % (`a`, `b`)
+
+ self.app = AppRoot()
+ self.app.folder = Folder()
+ self.app.folder.item = Item()
+ self.app.xxx = Item()
+
+ def _createRequest(self, extra_env={}, body="", outstream=None):
+ env = self._testEnv.copy()
+ env.update(extra_env)
+ if len(body):
+ env['CONTENT_LENGTH'] = str(len(body))
+
+ publication = DefaultPublication(self.app)
+ if outstream is None:
+ outstream = StringIO()
+ instream = StringIO(body)
+ request = HTTPRequest(instream, outstream, env)
+ request.setPublication(publication)
+ return request
+
+ def _publisherResults(self, extra_env={}, body=""):
+ outstream = StringIO()
+ request = self._createRequest(extra_env, body, outstream=outstream)
+ publish(request, handle_errors=0)
+ return outstream.getvalue()
+
+
+ def test_repr(self):
+ request = self._createRequest()
+ expect = '<%s.%s instance URL=http://foobar.com>' % (
+ request.__class__.__module__, request.__class__.__name__)
+ self.assertEqual(repr(request), expect)
+
+ def testTraversalToItem(self):
+ res = self._publisherResults()
+ self.failUnlessEqual(
+ res,
+ "Status: 200 Ok\r\n"
+ "Content-Length: 6\r\n"
+ "X-Powered-By: Zope (www.zope.org), Python (www.python.org)\r\n"
+ "\r\n"
+ "'5', 6")
+
+ def testRedirect(self):
+ # test HTTP/1.0
+ env = {'SERVER_PROTOCOL':'HTTP/1.0'}
+
+ request = self._createRequest(env, '')
+ location = request.response.redirect('http://foobar.com/redirected')
+ self.assertEquals(location, 'http://foobar.com/redirected')
+ self.assertEquals(request.response.getStatus(), 302)
+ self.assertEquals(request.response._headers['location'], location)
+
+ # test HTTP/1.1
+ env = {'SERVER_PROTOCOL':'HTTP/1.1'}
+
+ request = self._createRequest(env, '')
+ location = request.response.redirect('http://foobar.com/redirected')
+ self.assertEquals(request.response.getStatus(), 303)
+
+ # test explicit status
+ request = self._createRequest(env, '')
+ request.response.redirect('http://foobar.com/explicit', 304)
+ self.assertEquals(request.response.getStatus(), 304)
+
+ def testRequestEnvironment(self):
+ req = self._createRequest()
+ publish(req, handle_errors=0) # Force expansion of URL variables
+
+ self.assertEquals(str(req.URL), 'http://foobar.com/folder/item')
+ self.assertEquals(req.URL['-1'], 'http://foobar.com/folder')
+ self.assertEquals(req.URL['-2'], 'http://foobar.com')
+ self.assertRaises(KeyError, req.URL.__getitem__, '-3')
+
+ self.assertEquals(req.URL['0'], 'http://foobar.com')
+ self.assertEquals(req.URL['1'], 'http://foobar.com/folder')
+ self.assertEquals(req.URL['2'], 'http://foobar.com/folder/item')
+ self.assertRaises(KeyError, req.URL.__getitem__, '3')
+
+ self.assertEquals(req.URL.get('0'), 'http://foobar.com')
+ self.assertEquals(req.URL.get('1'), 'http://foobar.com/folder')
+ self.assertEquals(req.URL.get('2'), 'http://foobar.com/folder/item')
+ self.assertEquals(req.URL.get('3', 'none'), 'none')
+
+ self.assertEquals(req['SERVER_URL'], 'http://foobar.com')
+ self.assertEquals(req['HTTP_HOST'], 'foobar.com')
+ self.assertEquals(req['PATH_INFO'], '/folder/item')
+ self.assertEquals(req['CONTENT_LENGTH'], '0')
+ self.assertRaises(KeyError, req.__getitem__, 'HTTP_AUTHORIZATION')
+ self.assertEquals(req['GATEWAY_INTERFACE'], 'TestFooInterface/1.0')
+ self.assertEquals(req['HTTP_OFF_THE_WALL'], "Spam 'n eggs")
+
+ self.assertRaises(KeyError, req.__getitem__,
+ 'HTTP_WE_DID_NOT_PROVIDE_THIS')
+
+ def testRequestLocale(self):
+ eq = self.assertEqual
+ unless = self.failUnless
+ for httplang in ('it', 'it-ch', 'it-CH', 'IT', 'IT-CH', 'IT-ch'):
+ req = self._createRequest({'HTTP_ACCEPT_LANGUAGE': httplang})
+ locale = req.locale
+ unless(ILocale.providedBy(locale))
+ parts = httplang.split('-')
+ lang = parts.pop(0).lower()
+ territory = variant = None
+ if parts:
+ territory = parts.pop(0).upper()
+ if parts:
+ variant = parts.pop(0).upper()
+ eq(locale.id.language, lang)
+ eq(locale.id.territory, territory)
+ eq(locale.id.variant, variant)
+ # Now test for non-existant locale fallback
+ req = self._createRequest({'HTTP_ACCEPT_LANGUAGE': 'xx'})
+ locale = req.locale
+ unless(ILocale.providedBy(locale))
+ eq(locale.id.language, None)
+ eq(locale.id.territory, None)
+ eq(locale.id.variant, None)
+
+ # If the first language is not available we should try others
+ req = self._createRequest({'HTTP_ACCEPT_LANGUAGE': 'xx,en;q=0.5'})
+ locale = req.locale
+ unless(ILocale.providedBy(locale))
+ eq(locale.id.language, 'en')
+ eq(locale.id.territory, None)
+ eq(locale.id.variant, None)
+
+ # Regression test: there was a bug where territory and variant were
+ # not reset
+ req = self._createRequest({'HTTP_ACCEPT_LANGUAGE': 'xx-YY,en;q=0.5'})
+ locale = req.locale
+ unless(ILocale.providedBy(locale))
+ eq(locale.id.language, 'en')
+ eq(locale.id.territory, None)
+ eq(locale.id.variant, None)
+
+ def testCookies(self):
+ cookies = {
+ 'HTTP_COOKIE': 'foo=bar; spam="eggs", this="Should be accepted"'
+ }
+ req = self._createRequest(extra_env=cookies)
+
+ self.assertEquals(req.cookies[u'foo'], u'bar')
+ self.assertEquals(req[u'foo'], u'bar')
+
+ self.assertEquals(req.cookies[u'spam'], u'eggs')
+ self.assertEquals(req[u'spam'], u'eggs')
+
+ self.assertEquals(req.cookies[u'this'], u'Should be accepted')
+ self.assertEquals(req[u'this'], u'Should be accepted')
+
+ def testHeaders(self):
+ headers = {
+ 'TEST_HEADER': 'test',
+ 'Another-Test': 'another',
+ }
+ req = self._createRequest(extra_env=headers)
+ self.assertEquals(req.headers[u'TEST_HEADER'], u'test')
+ self.assertEquals(req.headers[u'TEST-HEADER'], u'test')
+ self.assertEquals(req.headers[u'test_header'], u'test')
+ self.assertEquals(req.getHeader('TEST_HEADER', literal=True), u'test')
+ self.assertEquals(req.getHeader('TEST-HEADER', literal=True), None)
+ self.assertEquals(req.getHeader('test_header', literal=True), None)
+ self.assertEquals(req.getHeader('Another-Test', literal=True),
+ 'another')
+
+ def testBasicAuth(self):
+ from zope.publisher.interfaces.http import IHTTPCredentials
+ import base64
+ req = self._createRequest()
+ verifyObject(IHTTPCredentials, req)
+ lpq = req._authUserPW()
+ self.assertEquals(lpq, None)
+ env = {}
+ login, password = ("tim", "123")
+ s = base64.encodestring("%s:%s" % (login, password)).rstrip()
+ env['HTTP_AUTHORIZATION'] = "Basic %s" % s
+ req = self._createRequest(env)
+ lpw = req._authUserPW()
+ self.assertEquals(lpw, (login, password))
+
+ def testSetPrincipal(self):
+ class HTTPTaskStub:
+ auth_user_name = None
+ def setAuthUserName(self, name):
+ self.auth_user_name = name
+
+ task = HTTPTaskStub()
+ req = self._createRequest(outstream=task)
+ req.setPrincipal(UserStub("jim"))
+ self.assert_(not req.response._outstream.auth_user_name)
+ req = self._createRequest(outstream=task)
+ req.response.setHTTPTransaction(task)
+ req.setPrincipal(UserStub("jim"))
+ self.assertEquals(req.response.http_transaction.auth_user_name, "jim")
+
+ def testIPresentationRequest(self):
+ # test the IView request
+ r = self._createRequest()
+
+ self.assertEqual(r.getPresentationSkin(), '')
+ r.setPresentationSkin('morefoo')
+ self.assertEqual(r.getPresentationSkin(), 'morefoo')
+
+ def test_method(self):
+ r = self._createRequest(extra_env={'REQUEST_METHOD':'SPAM'})
+ self.assertEqual(r.method, 'SPAM')
+ r = self._createRequest(extra_env={'REQUEST_METHOD':'eggs'})
+ self.assertEqual(r.method, 'EGGS')
+
+ def test_setApplicationServer(self):
+ req = self._createRequest()
+ req.setApplicationServer('foo')
+ self.assertEquals(req._app_server, 'http://foo')
+ req.setApplicationServer('foo', proto='https')
+ self.assertEquals(req._app_server, 'https://foo')
+ req.setApplicationServer('foo', proto='https', port=8080)
+ self.assertEquals(req._app_server, 'https://foo:8080')
+ req.setApplicationServer('foo', proto='http', port='9673')
+ self.assertEquals(req._app_server, 'http://foo:9673')
+ req.setApplicationServer('foo', proto='https', port=443)
+ self.assertEquals(req._app_server, 'https://foo')
+ req.setApplicationServer('foo', proto='https', port='443')
+ self.assertEquals(req._app_server, 'https://foo')
+ req.setApplicationServer('foo', port=80)
+ self.assertEquals(req._app_server, 'http://foo')
+ req.setApplicationServer('foo', proto='telnet', port=80)
+ self.assertEquals(req._app_server, 'telnet://foo:80')
+
+ def test_setApplicationNames(self):
+ req = self._createRequest()
+ names = ['x', 'y', 'z']
+ req.setVirtualHostRoot(names)
+ self.assertEquals(req._app_names, ['x', 'y', 'z'])
+ names[0] = 'muahahahaha'
+ self.assertEquals(req._app_names, ['x', 'y', 'z'])
+
+ def test_setVirtualHostRoot(self):
+ req = self._createRequest()
+ req._traversed_names = ['x', 'y']
+ req._last_obj_traversed = object()
+ req.setVirtualHostRoot()
+ self.failIf(req._traversed_names)
+ self.assertEquals(req._vh_root, req._last_obj_traversed)
+
+ def test_getVirtualHostRoot(self):
+ req = self._createRequest()
+ self.assertEquals(req.getVirtualHostRoot(), None)
+ req._vh_root = object()
+ self.assertEquals(req.getVirtualHostRoot(), req._vh_root)
+
+ def test_traverse(self):
+ req = self._createRequest()
+ req.traverse(self.app)
+ self.assertEquals(req._traversed_names, ['folder', 'item'])
+
+ # setting it during traversal matters
+ req = self._createRequest()
+ def hook(self, object, req=req, app=self.app):
+ if object is app.folder:
+ req.setVirtualHostRoot()
+ req.publication.callTraversalHooks = hook
+ req.traverse(self.app)
+ self.assertEquals(req._traversed_names, ['item'])
+ self.assertEquals(req._vh_root, self.app.folder)
+
+ def testInterface(self):
+ from zope.publisher.interfaces.http import IHTTPCredentials
+ from zope.publisher.interfaces.http import IHTTPApplicationRequest
+ rq = self._createRequest()
+ verifyObject(IHTTPRequest, rq)
+ verifyObject(IHTTPCredentials, rq)
+ verifyObject(IHTTPApplicationRequest, rq)
+
+ def testDeduceServerURL(self):
+ req = self._createRequest()
+ deduceServerURL = req._HTTPRequest__deduceServerURL
+ req._environ = {'HTTP_HOST': 'example.com:80'}
+ self.assertEquals(deduceServerURL(), 'http://example.com')
+ req._environ = {'HTTP_HOST': 'example.com:8080'}
+ self.assertEquals(deduceServerURL(), 'http://example.com:8080')
+ req._environ = {'HTTP_HOST': 'example.com:443', 'HTTPS': 'on'}
+ self.assertEquals(deduceServerURL(), 'https://example.com')
+ req._environ = {'HTTP_HOST': 'example.com:80', 'HTTPS': 'ON'}
+ self.assertEquals(deduceServerURL(), 'https://example.com:80')
+ req._environ = {'HTTP_HOST': 'example.com:8080',
+ 'SERVER_PORT_SECURE': '1'}
+ self.assertEquals(deduceServerURL(), 'https://example.com:8080')
+ req._environ = {'SERVER_NAME': 'example.com', 'SERVER_PORT':'8080',
+ 'SERVER_PORT_SECURE': '0'}
+ self.assertEquals(deduceServerURL(), 'http://example.com:8080')
+ req._environ = {'SERVER_NAME': 'example.com'}
+ self.assertEquals(deduceServerURL(), 'http://example.com')
+
+ def testUnicodeURLs(self):
+ req = self._createRequest(
+ {'PATH_INFO': '/%C3%A4%C3%B6/%C3%BC%C3%9F/foo/bar.html'})
+ self.assertEqual(req._traversal_stack,
+ [u'bar.html', u'foo', u'üß', u'äö'])
+
+
+class ConcreteHTTPTests(HTTPTests):
+ """Tests that we don't have to worry about subclasses inheriting and
+ breaking.
+ """
+
+ def test_shiftNameToApplication(self):
+ r = self._createRequest()
+ publish(r, handle_errors=0)
+ appurl = r.getApplicationURL()
+
+ # Verify that we can shift. It would be a little more realistic
+ # if we could test this during traversal, but the api doesn't
+ # let us do that.
+ r = self._createRequest(extra_env={"PATH_INFO": "/xxx"})
+ publish(r, handle_errors=0)
+ r.shiftNameToApplication()
+ self.assertEquals(r.getApplicationURL(), appurl+"/xxx")
+
+ # Verify that we can only shift if we've traversed only a single name
+ r = self._createRequest(extra_env={"PATH_INFO": "/folder/item"})
+ publish(r, handle_errors=0)
+ self.assertRaises(ValueError, r.shiftNameToApplication)
+
+
+
+class TestHTTPResponse(unittest.TestCase):
+
+ def testInterface(self):
+ rp = HTTPResponse(StringIO())
+ verifyObject(IHTTPResponse, rp)
+ verifyObject(IHTTPApplicationResponse, rp)
+ verifyObject(IResponse, rp)
+
+ def _createResponse(self):
+ stream = StringIO()
+ response = HTTPResponse(stream)
+ return response, stream
+
+ def _parseResult(self, result):
+ hdrs_text, body = result.split("\r\n\r\n", 1)
+ headers = {}
+ for line in hdrs_text.splitlines():
+ key, val = line.split(":", 1)
+ headers[key.strip()] = val.strip()
+ return headers, body
+
+ def _getResultFromResponse(self, body, charset=None, headers=None):
+ response, stream = self._createResponse()
+ if charset is not None:
+ response.setCharset()
+ if headers is not None:
+ for hdr, val in headers.iteritems():
+ response.setHeader(hdr, val)
+ response.setBody(body)
+ response.outputBody()
+ return self._parseResult(stream.getvalue())
+
+ def testContentLength(self):
+ eq = self.failUnlessEqual
+
+ headers, body = self._getResultFromResponse("test", "utf-8",
+ {"content-type": "text/plain"})
+ eq("4", headers["Content-Length"])
+ eq("test", body)
+
+ headers, body = self._getResultFromResponse(
+ u'\u0442\u0435\u0441\u0442', "utf-8",
+ {"content-type": "text/plain"})
+ eq("8", headers["Content-Length"])
+ eq('\xd1\x82\xd0\xb5\xd1\x81\xd1\x82', body)
+
+ def testContentType(self):
+ eq = self.failUnlessEqual
+
+ headers, body = self._getResultFromResponse("test", "utf-8")
+ eq("", headers.get("Content-Type", ""))
+ eq("test", body)
+
+ headers, body = self._getResultFromResponse("test",
+ headers={"content-type": "text/plain"})
+ eq("text/plain;charset=utf-8", headers["Content-Type"])
+ eq("test", body)
+
+ headers, body = self._getResultFromResponse("test", "utf-8",
+ {"content-type": "text/html"})
+ eq("text/html;charset=utf-8", headers["Content-Type"])
+ eq("test", body)
+
+ headers, body = self._getResultFromResponse("test", "utf-8",
+ {"content-type": "text/plain;charset=cp1251"})
+ eq("text/plain;charset=cp1251", headers["Content-Type"])
+ eq("test", body)
+
+ headers, body = self._getResultFromResponse("test", "utf-8",
+ {"content-type": "image/gif"})
+ eq("image/gif", headers["Content-Type"])
+ eq("test", body)
+
+
+def test_suite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(ConcreteHTTPTests))
+ suite.addTest(unittest.makeSuite(TestHTTPResponse))
+ return suite
+
+
+if __name__ == '__main__':
+ unittest.main()