summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTorsten Kurbad <github@tk-webart.de>2005-10-08 16:07:15 +0000
committerTorsten Kurbad <github@tk-webart.de>2005-10-08 16:07:15 +0000
commit48c9d02775699610c169bd19e017832769fe9f8c (patch)
tree3687a2895feae2b81456cf19b6f9dda61739099f
downloadzope-publisher-monolithic-zope3-zope3-twisted-merge.tar.gz
Merged from srichter-twisted-integration2 38370->38950monolithic-zope3-zope3-twisted-merge
-rw-r--r--base.py568
-rw-r--r--http.py1017
-rw-r--r--interfaces/__init__.py465
-rw-r--r--tests/basetestiapplicationrequest.py48
-rw-r--r--tests/test_baserequest.py96
-rw-r--r--tests/test_http.py633
6 files changed, 2827 insertions, 0 deletions
diff --git a/base.py b/base.py
new file mode 100644
index 0000000..826b50d
--- /dev/null
+++ b/base.py
@@ -0,0 +1,568 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Base implementations of the Publisher objects
+
+Specifically, 'BaseRequest', 'BaseResponse', and 'DefaultPublication' are
+specified here.
+
+$Id$
+"""
+import traceback
+from cStringIO import StringIO
+
+from zope.deprecation import deprecated
+
+from zope.interface import implements, providedBy
+from zope.interface.common.mapping import IReadMapping, IEnumerableMapping
+
+from zope.publisher.interfaces import IPublication, IHeld
+from zope.publisher.interfaces import NotFound, DebugError, Unauthorized
+from zope.publisher.interfaces import IRequest, IResponse, IDebugFlags
+from zope.publisher.publish import mapply
+
+_marker = object()
+
+class BaseResponse(object):
+ """Base Response Class
+ """
+
+ __slots__ = (
+ '_result', # The result of the application call
+ '_request', # The associated request (if any)
+ )
+
+ implements(IResponse)
+
+ def __init__(self, outstream=None):
+ self._request = None
+ # BBB: This is backward-compatibility support for the deprecated
+ # output stream.
+ if outstream is not None:
+ import warnings
+ warnings.warn("Can't pass output streams to requests anymore. "
+ "This will go away in Zope 3.4.",
+ DeprecationWarning,
+ 2)
+
+ def setResult(self, result):
+ 'See IPublisherResponse'
+ self._result = result
+
+ def handleException(self, exc_info):
+ 'See IPublisherResponse'
+ f = StringIO()
+ traceback.print_exception(
+ exc_info[0], exc_info[1], exc_info[2], 100, f)
+ self.setResult(f.getvalue())
+
+ def internalError(self):
+ 'See IPublisherResponse'
+ pass
+
+ def reset(self):
+ 'See IPublisherResponse'
+ pass
+
+ def retry(self):
+ 'See IPublisherResponse'
+ return self.__class__()
+
+ # BBB: Backward-compatibility for old body API
+ def setBody(self, body):
+ return self.setResult(body)
+ setBody = deprecated(
+ setBody,
+ 'setBody() has been deprecated in favor of setResult(). '
+ 'This will go away in Zope 3.4.')
+
+
+class RequestDataGetter(object):
+
+ implements(IReadMapping)
+
+ def __init__(self, request):
+ self.__get = getattr(request, self._gettrname)
+
+ def __getitem__(self, name):
+ return self.__get(name)
+
+ def get(self, name, default=None):
+ return self.__get(name, default)
+
+ def __contains__(self, key):
+ lookup = self.get(key, self)
+ return lookup is not self
+
+ has_key = __contains__
+
+class RequestDataMapper(object):
+
+ implements(IEnumerableMapping)
+
+ def __init__(self, request):
+ self.__map = getattr(request, self._mapname)
+
+ def __getitem__(self, name):
+ return self.__map[name]
+
+ def get(self, name, default=None):
+ return self.__map.get(name, default)
+
+ def __contains__(self, key):
+ lookup = self.get(key, self)
+ return lookup is not self
+
+ has_key = __contains__
+
+ def keys(self):
+ return self.__map.keys()
+
+ def __iter__(self):
+ return iter(self.keys())
+
+ def items(self):
+ return self.__map.items()
+
+ def values(self):
+ return self.__map.values()
+
+ def __len__(self):
+ return len(self.__map)
+
+class RequestDataProperty(object):
+
+ def __init__(self, gettr_class):
+ self.__gettr_class = gettr_class
+
+ def __get__(self, request, rclass=None):
+ if request is not None:
+ return self.__gettr_class(request)
+
+ def __set__(*args):
+ raise AttributeError('Unassignable attribute')
+
+
+class RequestEnvironment(RequestDataMapper):
+ _mapname = '_environ'
+
+
+class DebugFlags(object):
+ """Debugging flags."""
+
+ implements(IDebugFlags)
+
+ sourceAnnotations = False
+ showTAL = False
+
+
+class BaseRequest(object):
+ """Represents a publishing request.
+
+ This object provides access to request data. Request data may
+ vary depending on the protocol used.
+
+ Request objects are created by the object publisher and will be
+ passed to published objects through the argument name, REQUEST.
+
+ The request object is a mapping object that represents a
+ collection of variable to value mappings.
+ """
+
+ implements(IRequest)
+
+ __slots__ = (
+ '__provides__', # Allow request to directly provide interfaces
+ '_held', # Objects held until the request is closed
+ '_traversed_names', # The names that have been traversed
+ '_last_obj_traversed', # Object that was traversed last
+ '_traversal_stack', # Names to be traversed, in reverse order
+ '_environ', # The request environment variables
+ '_response', # The response
+ '_args', # positional arguments
+ '_body_instream', # input stream
+ '_body', # The request body as a string
+ '_publication', # publication object
+ '_principal', # request principal, set by publication
+ 'interaction', # interaction, set by interaction
+ 'debug', # debug flags
+ 'annotations', # per-package annotations
+ )
+
+ environment = RequestDataProperty(RequestEnvironment)
+
+ def __init__(self, body_instream, environ, response=None,
+ positional=None, outstream=None):
+
+ # BBB: This is backward-compatibility support for the deprecated
+ # output stream.
+ if not hasattr(environ, 'get'):
+ import warnings
+ warnings.warn("Can't pass output streams to requests anymore. "
+ "This will go away in Zope 3.4.",
+ DeprecationWarning,
+ 2)
+ environ, response, positional = response, positional, outstream
+
+
+ self._traversal_stack = []
+ self._last_obj_traversed = None
+ self._traversed_names = []
+ self._environ = environ
+
+ self._args = positional or ()
+
+ if response is None:
+ self._response = self._createResponse()
+ else:
+ self._response = response
+
+ self._response._request = self
+
+ self._body_instream = body_instream
+ self._held = ()
+ self._principal = None
+ self.debug = DebugFlags()
+ self.interaction = None
+ self.annotations = {}
+
+ def setPrincipal(self, principal):
+ self._principal = principal
+
+ principal = property(lambda self: self._principal)
+
+ def _getPublication(self):
+ 'See IPublisherRequest'
+ return getattr(self, '_publication', None)
+
+ publication = property(_getPublication)
+
+ def processInputs(self):
+ 'See IPublisherRequest'
+ # Nothing to do here
+
+ def retry(self):
+ 'See IPublisherRequest'
+ raise TypeError('Retry is not supported')
+
+ def setPublication(self, pub):
+ 'See IPublisherRequest'
+ self._publication = pub
+
+ def supportsRetry(self):
+ 'See IPublisherRequest'
+ return 0
+
+ def traverse(self, object):
+ 'See IPublisherRequest'
+
+ publication = self.publication
+
+ traversal_stack = self._traversal_stack
+ traversed_names = self._traversed_names
+
+ self._last_obj_traversed = object
+
+ prev_object = None
+ while True:
+
+ if object is not prev_object:
+ # Invoke hooks (but not more than once).
+ publication.callTraversalHooks(self, object)
+
+ prev_object = object
+
+ if traversal_stack:
+ # Traverse to the next step.
+ entry_name = traversal_stack.pop()
+ traversed_names.append(entry_name)
+ subobject = publication.traverseName(
+ self, object, entry_name)
+ self._last_obj_traversed = object = subobject
+ else:
+ # Finished traversal.
+ break
+
+ return object
+
+ def close(self):
+ 'See IPublicationRequest'
+
+ for held in self._held:
+ if IHeld.providedBy(held):
+ held.release()
+
+ self._held = None
+ self._body_instream = None
+ self._publication = None
+
+ def getPositionalArguments(self):
+ 'See IPublicationRequest'
+ return self._args
+
+ def _getResponse(self):
+ return self._response
+
+ response = property(_getResponse)
+
+ def getTraversalStack(self):
+ 'See IPublicationRequest'
+ return list(self._traversal_stack) # Return a copy
+
+ def hold(self, object):
+ 'See IPublicationRequest'
+ self._held = self._held + (object,)
+
+ def setTraversalStack(self, stack):
+ 'See IPublicationRequest'
+ self._traversal_stack[:] = list(stack)
+
+ def _getBodyStream(self):
+ 'See zope.publisher.interfaces.IApplicationRequest'
+ return self._body_instream
+
+ bodyStream = property(_getBodyStream)
+
+ ########################################################################
+ # BBB: Deprecated; will go away in Zope 3.4
+
+ def _getBody(self):
+ body = getattr(self, '_body', None)
+ if body is None:
+ s = self._body_instream
+ if s is None:
+ return None # TODO: what should be returned here?
+ body = s.read()
+ self._body = body
+ return body
+
+ body = property(_getBody)
+ body = deprecated(body,
+ 'The ``body`` attribute has been deprecated. Please '
+ 'use the ``bodyStream`` attribute directly. This '
+ 'attribute will go away in Zope 3.4.')
+
+ bodyFile = bodyStream
+ bodyFile = deprecated(bodyFile,
+ 'The ``bodyFile`` attribute has been replaced by '
+ '``bodyStream``, which is a more accurate name. '
+ 'Streams are not necessarily files, i.e. they are '
+ 'not seekable. This attribute will go away in Zope '
+ '3.4.')
+
+ ########################################################################
+
+ def __len__(self):
+ 'See Interface.Common.Mapping.IEnumerableMapping'
+ return len(self.keys())
+
+ def items(self):
+ 'See Interface.Common.Mapping.IEnumerableMapping'
+ result = []
+ get = self.get
+ for k in self.keys():
+ result.append((k, get(k)))
+ return result
+
+ def keys(self):
+ 'See Interface.Common.Mapping.IEnumerableMapping'
+ return self._environ.keys()
+
+ def __iter__(self):
+ return iter(self.keys())
+
+ def values(self):
+ 'See Interface.Common.Mapping.IEnumerableMapping'
+ result = []
+ get = self.get
+ for k in self.keys():
+ result.append(get(k))
+ return result
+
+ def __getitem__(self, key):
+ 'See Interface.Common.Mapping.IReadMapping'
+ result = self.get(key, _marker)
+ if result is _marker:
+ raise KeyError(key)
+ else:
+ return result
+
+ def get(self, key, default=None):
+ 'See Interface.Common.Mapping.IReadMapping'
+
+ result = self._environ.get(key, self)
+ if result is not self: return result
+
+ return default
+
+ def __contains__(self, key):
+ 'See Interface.Common.Mapping.IReadMapping'
+ lookup = self.get(key, self)
+ return lookup is not self
+
+ has_key = __contains__
+
+ def _createResponse(self):
+ # Should be overridden by subclasses
+ return BaseResponse()
+
+ def __nonzero__(self):
+ # This is here to avoid calling __len__ for boolean tests
+ return 1
+
+ def __str__(self):
+ L1 = self.items()
+ L1.sort()
+ return "\n".join(map(lambda item: "%s:\t%s" % item, L1))
+
+ def _setupPath_helper(self, attr):
+ path = self.get(attr, "/").strip()
+ if path.endswith('/'):
+ # Remove trailing backslash, so that we will not get an empty
+ # last entry when splitting the path.
+ path = path[:-1]
+ self._endswithslash = True
+ else:
+ self._endswithslash = False
+
+ clean = []
+ for item in path.split('/'):
+ if not item or item == '.':
+ continue
+ elif item == '..':
+ try:
+ del clean[-1]
+ except IndexError:
+ raise NotFound('..')
+ else: clean.append(item)
+
+ clean.reverse()
+ self.setTraversalStack(clean)
+
+ self._path_suffix = None
+
+class TestRequest(BaseRequest):
+
+ __slots__ = ('_presentation_type', )
+
+ def __init__(self, path, body_instream=None, environ=None, outstream=None):
+
+ # BBB: This is backward-compatibility support for the deprecated
+ # output stream.
+ if environ is None:
+ environ = {}
+ else:
+ if not hasattr(environ, 'get'):
+ import warnings
+ warnings.warn("Can't pass output streams to requests anymore. "
+ "This will go away in Zope 3.4.",
+ DeprecationWarning,
+ 2)
+ environ, outstream = outstream, environ
+
+ environ['PATH_INFO'] = path
+ if body_instream is None:
+ body_instream = StringIO('')
+
+ super(TestRequest, self).__init__(body_instream, environ)
+ self.response._outstream = outstream
+
+ def _createResponse(self):
+ return BBBResponse()
+
+class BBBResponse(BaseResponse):
+
+ def outputBody(self):
+ import warnings
+ warnings.warn("Can't pass output streams to requests anymore",
+ DeprecationWarning,
+ 2)
+ self._outstream.write(self._result)
+
+class DefaultPublication(object):
+ """A stub publication.
+
+ This works just like Zope2's ZPublisher. It rejects any name
+ starting with an underscore and any objects (specifically: method)
+ that doesn't have a docstring.
+ """
+ implements(IPublication)
+
+ require_docstrings = True
+
+ def __init__(self, app):
+ self.app = app
+
+ def beforeTraversal(self, request):
+ # Lop off leading and trailing empty names
+ stack = request.getTraversalStack()
+ while stack and not stack[-1]:
+ stack.pop() # toss a trailing empty name
+ while stack and not stack[0]:
+ stack.pop(0) # toss a leading empty name
+ request.setTraversalStack(stack)
+
+ def getApplication(self, request):
+ return self.app
+
+ def callTraversalHooks(self, request, ob):
+ pass
+
+ def traverseName(self, request, ob, name, check_auth=1):
+ if name.startswith('_'):
+ raise Unauthorized(name)
+ if hasattr(ob, name):
+ subob = getattr(ob, name)
+ else:
+ try:
+ subob = ob[name]
+ except (KeyError, IndexError,
+ TypeError, AttributeError):
+ raise NotFound(ob, name, request)
+ if self.require_docstrings and not getattr(subob, '__doc__', None):
+ raise DebugError(subob, 'Missing or empty doc string')
+ return subob
+
+ def getDefaultTraversal(self, request, ob):
+ return ob, ()
+
+ def afterTraversal(self, request, ob):
+ pass
+
+ def callObject(self, request, ob):
+ return mapply(ob, request.getPositionalArguments(), request)
+
+ def afterCall(self, request, ob):
+ pass
+
+ def endRequest(self, request, ob):
+ pass
+
+ def handleException(self, object, request, exc_info, retry_allowed=1):
+ # Let the response handle it as best it can.
+ request.response.reset()
+ request.response.handleException(exc_info)
+
+
+class TestPublication(DefaultPublication):
+
+ def traverseName(self, request, ob, name, check_auth=1):
+ if hasattr(ob, name):
+ subob = getattr(ob, name)
+ else:
+ try:
+ subob = ob[name]
+ except (KeyError, IndexError,
+ TypeError, AttributeError):
+ raise NotFound(ob, name, request)
+ return subob
diff --git a/http.py b/http.py
new file mode 100644
index 0000000..c2f6fc1
--- /dev/null
+++ b/http.py
@@ -0,0 +1,1017 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""HTTP Publisher
+
+$Id$
+"""
+import re, time, random
+from urllib import quote, unquote, splitport
+from types import StringTypes, ClassType
+from cgi import escape
+from Cookie import SimpleCookie
+from Cookie import CookieError
+import logging
+from tempfile import TemporaryFile
+
+from zope.deprecation import deprecation
+from zope.interface import implements
+
+from zope.publisher import contenttype
+from zope.publisher.interfaces.http import IHTTPCredentials
+from zope.publisher.interfaces.http import IHTTPRequest
+from zope.publisher.interfaces.http import IHTTPApplicationRequest
+from zope.publisher.interfaces.http import IHTTPPublisher
+
+from zope.publisher.interfaces import Redirect
+from zope.publisher.interfaces.http import IHTTPResponse, IResult
+from zope.publisher.interfaces.http import IHTTPApplicationResponse
+from zope.publisher.interfaces.logginginfo import ILoggingInfo
+from zope.i18n.interfaces import IUserPreferredCharsets
+from zope.i18n.interfaces import IUserPreferredLanguages
+from zope.i18n.locales import locales, LoadLocaleError
+
+from zope.publisher import contenttype
+from zope.publisher.base import BaseRequest, BaseResponse
+from zope.publisher.base import RequestDataProperty, RequestDataMapper
+from zope.publisher.base import RequestDataGetter
+
+
+# Default Encoding
+ENCODING = 'UTF-8'
+
+class CookieMapper(RequestDataMapper):
+ _mapname = '_cookies'
+
+class HeaderGetter(RequestDataGetter):
+ _gettrname = 'getHeader'
+
+base64 = None
+
+def sane_environment(env):
+ # return an environment mapping which has been cleaned of
+ # funny business such as REDIRECT_ prefixes added by Apache
+ # or HTTP_CGI_AUTHORIZATION hacks.
+ # It also makes sure PATH_INFO is a unicode string.
+ dict = {}
+ for key, val in env.items():
+ while key.startswith('REDIRECT_'):
+ key = key[9:]
+ dict[key] = val
+ if 'HTTP_CGI_AUTHORIZATION' in dict:
+ dict['HTTP_AUTHORIZATION'] = dict.pop('HTTP_CGI_AUTHORIZATION')
+ if 'PATH_INFO' in dict:
+ dict['PATH_INFO'] = dict['PATH_INFO'].decode('utf-8')
+ return dict
+
+# Possible HTTP status responses
+status_reasons = {
+100: 'Continue',
+101: 'Switching Protocols',
+102: 'Processing',
+200: 'OK',
+201: 'Created',
+202: 'Accepted',
+203: 'Non-Authoritative Information',
+204: 'No Content',
+205: 'Reset Content',
+206: 'Partial Content',
+207: 'Multi-Status',
+300: 'Multiple Choices',
+301: 'Moved Permanently',
+302: 'Moved Temporarily',
+303: 'See Other',
+304: 'Not Modified',
+305: 'Use Proxy',
+307: 'Temporary Redirect',
+400: 'Bad Request',
+401: 'Unauthorized',
+402: 'Payment Required',
+403: 'Forbidden',
+404: 'Not Found',
+405: 'Method Not Allowed',
+406: 'Not Acceptable',
+407: 'Proxy Authentication Required',
+408: 'Request Time-out',
+409: 'Conflict',
+410: 'Gone',
+411: 'Length Required',
+412: 'Precondition Failed',
+413: 'Request Entity Too Large',
+414: 'Request-URI Too Large',
+415: 'Unsupported Media Type',
+416: 'Requested range not satisfiable',
+417: 'Expectation Failed',
+422: 'Unprocessable Entity',
+423: 'Locked',
+424: 'Failed Dependency',
+500: 'Internal Server Error',
+501: 'Not Implemented',
+502: 'Bad Gateway',
+503: 'Service Unavailable',
+504: 'Gateway Time-out',
+505: 'HTTP Version not supported',
+507: 'Insufficient Storage',
+}
+
+status_codes={}
+
+def init_status_codes():
+ # Add mappings for builtin exceptions and
+ # provide text -> error code lookups.
+ for key, val in status_reasons.items():
+ status_codes[val.replace(' ', '').lower()] = key
+ status_codes[val.lower()] = key
+ status_codes[key] = key
+ status_codes[str(key)] = key
+
+ en = [n.lower() for n in dir(__builtins__) if n.endswith('Error')]
+
+ for name in en:
+ status_codes[name] = 500
+
+init_status_codes()
+
+
+class URLGetter(object):
+
+ __slots__ = "__request"
+
+ def __init__(self, request):
+ self.__request = request
+
+ def __str__(self):
+ return self.__request.getURL()
+
+ def __getitem__(self, name):
+ url = self.get(name, None)
+ if url is None:
+ raise KeyError(name)
+ return url
+
+ def get(self, name, default=None):
+ i = int(name)
+ try:
+ if i < 0:
+ i = -i
+ return self.__request.getURL(i)
+ else:
+ return self.__request.getApplicationURL(i)
+ except IndexError, v:
+ if v[0] == i:
+ return default
+ raise
+
+class HTTPInputStream(object):
+ """Special stream that supports caching the read data.
+
+ This is important, so that we can retry requests.
+ """
+
+ def __init__(self, stream):
+ self.stream = stream
+ self.cacheStream = TemporaryFile()
+
+ def getCacheStream(self):
+ self.read()
+ self.cacheStream.seek(0)
+ return self.cacheStream
+
+ def read(self, size=-1):
+ data = self.stream.read(size)
+ self.cacheStream.write(data)
+ return data
+
+ def readline(self):
+ data = self.stream.readline()
+ self.cacheStream.write(data)
+ return data
+
+ def readlines(self, hint=None):
+ data = self.stream.readlines(hint)
+ self.cacheStream.write(''.join(data))
+ return data
+
+
+DEFAULT_PORTS = {'http': '80', 'https': '443'}
+STAGGER_RETRIES = True
+
+class HTTPRequest(BaseRequest):
+ """Model HTTP request data.
+
+ This object provides access to request data. This includes, the
+ input headers, form data, server data, and cookies.
+
+ Request objects are created by the object publisher and will be
+ passed to published objects through the argument name, REQUEST.
+
+ The request object is a mapping object that represents a
+ collection of variable to value mappings. In addition, variables
+ are divided into four categories:
+
+ - Environment variables
+
+ These variables include input headers, server data, and other
+ request-related data. The variable names are as <a
+ href="http://hoohoo.ncsa.uiuc.edu/cgi/env.html">specified</a>
+ in the <a
+ href="http://hoohoo.ncsa.uiuc.edu/cgi/interface.html">CGI
+ specification</a>
+
+ - Form data
+
+ These are data extracted from either a URL-encoded query
+ string or body, if present.
+
+ - Cookies
+
+ These are the cookie data, if present.
+
+ - Other
+
+ Data that may be set by an application object.
+
+ The form attribute of a request is actually a Field Storage
+ object. When file uploads are used, this provides a richer and
+ more complex interface than is provided by accessing form data as
+ items of the request. See the FieldStorage class documentation
+ for more details.
+
+ The request object may be used as a mapping object, in which case
+ values will be looked up in the order: environment variables,
+ other variables, form data, and then cookies.
+ """
+ implements(IHTTPCredentials, IHTTPRequest, IHTTPApplicationRequest)
+
+ __slots__ = (
+ '__provides__', # Allow request to directly provide interfaces
+ '_auth', # The value of the HTTP_AUTHORIZATION header.
+ '_cookies', # The request cookies
+ '_path_suffix', # Extra traversal steps after normal traversal
+ '_retry_count', # How many times the request has been retried
+ '_app_names', # The application path as a sequence
+ '_app_server', # The server path of the application url
+ '_orig_env', # The original environment
+ '_endswithslash', # Does the given path end with /
+ 'method', # The upper-cased request method (REQUEST_METHOD)
+ '_locale', # The locale for the request
+ '_vh_root', # Object at the root of the virtual host
+ )
+
+ retry_max_count = 3 # How many times we're willing to retry
+
+ def __init__(self, body_instream, environ, response=None, outstream=None):
+ # BBB: This is backward-compatibility support for the deprecated
+ # output stream.
+ try:
+ environ.get
+ except AttributeError:
+ import warnings
+ warnings.warn("Can't pass output streams to requests anymore. "
+ "This will go away in Zope 3.4.",
+ DeprecationWarning,
+ 2)
+ environ, response = response, outstream
+
+ super(HTTPRequest, self).__init__(
+ HTTPInputStream(body_instream), environ, response)
+
+ self._orig_env = environ
+ environ = sane_environment(environ)
+
+ if 'HTTP_AUTHORIZATION' in environ:
+ self._auth = environ['HTTP_AUTHORIZATION']
+ del environ['HTTP_AUTHORIZATION']
+ else:
+ self._auth = None
+
+ self.method = environ.get("REQUEST_METHOD", 'GET').upper()
+
+ self._environ = environ
+
+ self.__setupCookies()
+ self.__setupPath()
+ self.__setupURLBase()
+ self._vh_root = None
+ self.__setupLocale()
+
+ def __setupLocale(self):
+ envadapter = IUserPreferredLanguages(self, None)
+ if envadapter is None:
+ self._locale = None
+ return
+
+ langs = envadapter.getPreferredLanguages()
+ for httplang in langs:
+ parts = (httplang.split('-') + [None, None])[:3]
+ try:
+ self._locale = locales.getLocale(*parts)
+ return
+ except LoadLocaleError:
+ # Just try the next combination
+ pass
+ else:
+ # No combination gave us an existing locale, so use the default,
+ # which is guaranteed to exist
+ self._locale = locales.getLocale(None, None, None)
+
+ def _getLocale(self):
+ return self._locale
+ locale = property(_getLocale)
+
+ def __setupURLBase(self):
+ get_env = self._environ.get
+ # Get base info first. This isn't likely to cause
+ # errors and might be useful to error handlers.
+ script = get_env('SCRIPT_NAME', '').strip()
+
+ # _script and the other _names are meant for URL construction
+ self._app_names = filter(None, script.split('/'))
+
+ # get server URL and store it too, since we are already looking it up
+ server_url = get_env('SERVER_URL', None)
+ if server_url is not None:
+ self._app_server = server_url = server_url.strip()
+ else:
+ server_url = self.__deduceServerURL()
+
+ if server_url.endswith('/'):
+ server_url = server_url[:-1]
+
+ # strip off leading /'s of script
+ while script.startswith('/'):
+ script = script[1:]
+
+ self._app_server = server_url
+
+ def __deduceServerURL(self):
+ environ = self._environ
+
+ if (environ.get('HTTPS', '').lower() == "on" or
+ environ.get('SERVER_PORT_SECURE') == "1"):
+ protocol = 'https'
+ else:
+ protocol = 'http'
+
+ if environ.has_key('HTTP_HOST'):
+ host = environ['HTTP_HOST'].strip()
+ hostname, port = splitport(host)
+ else:
+ hostname = environ.get('SERVER_NAME', '').strip()
+ port = environ.get('SERVER_PORT', '')
+
+ if port and port != DEFAULT_PORTS.get(protocol):
+ host = hostname + ':' + port
+ else:
+ host = hostname
+
+ return '%s://%s' % (protocol, host)
+
+ def _parseCookies(self, text, result=None):
+ """Parse 'text' and return found cookies as 'result' dictionary."""
+
+ if result is None:
+ result = {}
+
+ # ignore cookies on a CookieError
+ try:
+ c = SimpleCookie(text)
+ except CookieError, e:
+ log = logging.getLogger('eventlog')
+ log.warn(e)
+ return result
+
+ for k,v in c.items():
+ result[unicode(k, ENCODING)] = unicode(v.value, ENCODING)
+
+ return result
+
+ def __setupCookies(self):
+ # Cookie values should *not* be appended to existing form
+ # vars with the same name - they are more like default values
+ # for names not otherwise specified in the form.
+ self._cookies = {}
+ cookie_header = self._environ.get('HTTP_COOKIE', None)
+ if cookie_header is not None:
+ self._parseCookies(cookie_header, self._cookies)
+
+ def __setupPath(self):
+ # PATH_INFO is unicode here, so setupPath_helper sets up the
+ # traversal stack correctly.
+ self._setupPath_helper("PATH_INFO")
+
+ def supportsRetry(self):
+ 'See IPublisherRequest'
+ count = getattr(self, '_retry_count', 0)
+ if count < self.retry_max_count:
+ if STAGGER_RETRIES:
+ time.sleep(random.uniform(0, 2**(count)))
+ return True
+
+ def retry(self):
+ 'See IPublisherRequest'
+ count = getattr(self, '_retry_count', 0)
+ self._retry_count = count + 1
+
+ new_response = self.response.retry()
+ request = self.__class__(
+ # Use the cache stream as the new input stream.
+ body_instream=self._body_instream.getCacheStream(),
+ environ=self._orig_env,
+ response=new_response,
+ )
+ request.setPublication(self.publication)
+ request._retry_count = self._retry_count
+ return request
+
+ def traverse(self, object):
+ 'See IPublisherRequest'
+
+ ob = super(HTTPRequest, self).traverse(object)
+ if self._path_suffix:
+ self._traversal_stack = self._path_suffix
+ ob = super(HTTPRequest, self).traverse(ob)
+
+ return ob
+
+ def getHeader(self, name, default=None, literal=False):
+ 'See IHTTPRequest'
+ environ = self._environ
+ if not literal:
+ name = name.replace('-', '_').upper()
+ val = environ.get(name, None)
+ if val is not None:
+ return val
+ if not name.startswith('HTTP_'):
+ name='HTTP_%s' % name
+ return environ.get(name, default)
+
+ headers = RequestDataProperty(HeaderGetter)
+
+ def getCookies(self):
+ 'See IHTTPApplicationRequest'
+ return self._cookies
+
+ cookies = RequestDataProperty(CookieMapper)
+
+ def setPathSuffix(self, steps):
+ 'See IHTTPRequest'
+ steps = list(steps)
+ steps.reverse()
+ self._path_suffix = steps
+
+ def _authUserPW(self):
+ 'See IHTTPCredentials'
+ global base64
+ if self._auth:
+ if self._auth.lower().startswith('basic '):
+ if base64 is None:
+ import base64
+ name, password = base64.decodestring(
+ self._auth.split()[-1]).split(':')
+ return name, password
+
+ def unauthorized(self, challenge):
+ 'See IHTTPCredentials'
+ self._response.setHeader("WWW-Authenticate", challenge, True)
+ self._response.setStatus(401)
+
+ def setPrincipal(self, principal):
+ 'See IPublicationRequest'
+ super(HTTPRequest, self).setPrincipal(principal)
+ logging_info = ILoggingInfo(principal, None)
+ if logging_info is None:
+ message = '-'
+ else:
+ message = logging_info.getLogMessage()
+ self.response.authUser = message
+
+ def _createResponse(self):
+ # Should be overridden by subclasses
+ return HTTPResponse()
+
+
+ def getURL(self, level=0, path_only=False):
+ names = self._app_names + self._traversed_names
+ if level:
+ if level > len(names):
+ raise IndexError(level)
+ names = names[:-level]
+ # See: http://www.ietf.org/rfc/rfc2718.txt, Section 2.2.5
+ names = [quote(name.encode("utf-8"), safe='/+@') for name in names]
+
+ if path_only:
+ if not names:
+ return '/'
+ return '/' + '/'.join(names)
+ else:
+ if not names:
+ return self._app_server
+ return "%s/%s" % (self._app_server, '/'.join(names))
+
+ def getApplicationURL(self, depth=0, path_only=False):
+ """See IHTTPApplicationRequest"""
+ if depth:
+ names = self._traversed_names
+ if depth > len(names):
+ raise IndexError(depth)
+ names = self._app_names + names[:depth]
+ else:
+ names = self._app_names
+
+ # See: http://www.ietf.org/rfc/rfc2718.txt, Section 2.2.5
+ names = [quote(name.encode("utf-8"), safe='/+@') for name in names]
+
+ if path_only:
+ return names and ('/' + '/'.join(names)) or '/'
+ else:
+ return (names and ("%s/%s" % (self._app_server, '/'.join(names)))
+ or self._app_server)
+
+ def setApplicationServer(self, host, proto='http', port=None):
+ if port and str(port) != DEFAULT_PORTS.get(proto):
+ host = '%s:%s' % (host, port)
+ self._app_server = '%s://%s' % (proto, host)
+
+ def shiftNameToApplication(self):
+ """Add the name being traversed to the application name
+
+ This is only allowed in the case where the name is the first name.
+
+ A Value error is raise if the shift can't be performed.
+ """
+ if len(self._traversed_names) == 1:
+ self._app_names.append(self._traversed_names.pop())
+ return
+
+ raise ValueError("Can only shift leading traversal "
+ "names to application names")
+
+ def setVirtualHostRoot(self, names=()):
+ del self._traversed_names[:]
+ self._vh_root = self._last_obj_traversed
+ self._app_names = list(names)
+
+ def getVirtualHostRoot(self):
+ return self._vh_root
+
+ URL = RequestDataProperty(URLGetter)
+
+ def __repr__(self):
+ # Returns a *short* string.
+ return '<%s.%s instance URL=%s>' % (
+ self.__class__.__module__, self.__class__.__name__, str(self.URL))
+
+ def get(self, key, default=None):
+ 'See Interface.Common.Mapping.IReadMapping'
+
+ result = self._cookies.get(key, self)
+ if result is not self: return result
+
+ result = self._environ.get(key, self)
+ if result is not self: return result
+
+ return default
+
+ def keys(self):
+ 'See Interface.Common.Mapping.IEnumerableMapping'
+ d = {}
+ d.update(self._environ)
+ d.update(self._cookies)
+ return d.keys()
+
+
+class HTTPResponse(BaseResponse):
+ implements(IHTTPResponse, IHTTPApplicationResponse)
+
+ __slots__ = (
+ 'authUser', # Authenticated user string
+ # BBB: Remove for Zope 3.4.
+ '_header_output', # Hook object to collaborate with a server
+ # for header generation.
+ '_headers',
+ '_cookies',
+ '_status', # The response status (usually an integer)
+ '_reason', # The reason that goes with the status
+ '_status_set', # Boolean: status explicitly set
+ '_charset', # String: character set for the output
+ )
+
+
+ def __init__(self, header_output=None, http_transaction=None):
+ # BBB: Both, header_output and http_transaction have been deprecated.
+ if header_output is not None:
+ import warnings
+ warnings.warn(
+ "The header output API is completely deprecated. It's "
+ "intentions were not clear and it duplicated APIs in the "
+ "response, which you should use instead. "
+ "This will go away in Zope 3.4.",
+ DeprecationWarning, 2)
+
+ if http_transaction is not None:
+ import warnings
+ warnings.warn(
+ "Storing the HTTP transaction here was a *huge* hack to "
+ "support transporting the authenticated user string "
+ "to the server. You should never rely on this variable "
+ "anyways. "
+ "This will go away in Zope 3.4.",
+ DeprecationWarning, 2)
+
+ self._header_output = header_output
+
+ super(HTTPResponse, self).__init__()
+ self.reset()
+
+
+ def reset(self):
+ 'See IResponse'
+ super(HTTPResponse, self).reset()
+ self._headers = {}
+ self._cookies = {}
+ self._status = 599
+ self._reason = 'No status set'
+ self._status_set = False
+ self._charset = None
+ self.authUser = '-'
+
+ def setStatus(self, status, reason=None):
+ 'See IHTTPResponse'
+ if status is None:
+ status = 200
+ else:
+ if type(status) in StringTypes:
+ status = status.lower()
+ if status in status_codes:
+ status = status_codes[status]
+ else:
+ status = 500
+ self._status = status
+
+ if reason is None:
+ if status == 200:
+ reason = 'Ok'
+ elif status in status_reasons:
+ reason = status_reasons[status]
+ else:
+ reason = 'Unknown'
+ self._reason = reason
+ self._status_set = True
+
+
+ def getStatus(self):
+ 'See IHTTPResponse'
+ return self._status
+
+ def getStatusString(self):
+ 'See IHTTPResponse'
+ return '%i %s' % (self._status, self._reason)
+
+ def setHeader(self, name, value, literal=False):
+ 'See IHTTPResponse'
+ name = str(name)
+ value = str(value)
+
+ if not literal:
+ name = name.lower()
+
+ self._headers[name] = [value]
+
+
+ def addHeader(self, name, value):
+ 'See IHTTPResponse'
+ values = self._headers.setdefault(name, [])
+ values.append(value)
+
+
+ def getHeader(self, name, default=None, literal=False):
+ 'See IHTTPResponse'
+ key = name.lower()
+ name = literal and name or key
+ result = self._headers.get(name)
+ if result:
+ return result[0]
+ return default
+
+
+ def getHeaders(self):
+ 'See IHTTPResponse'
+ result = []
+ headers = self._headers
+
+ result.append(
+ ("X-Powered-By", "Zope (www.zope.org), Python (www.python.org)"))
+
+ for key, values in headers.items():
+ if key.lower() == key:
+ # only change non-literal header names
+ key = '-'.join([k.capitalize() for k in key.split('-')])
+ result.extend([(key, val) for val in values])
+
+ result.extend([tuple(cookie.split(': ', 1))
+ for cookie in self._cookie_list()])
+
+ return result
+
+
+ def appendToCookie(self, name, value):
+ 'See IHTTPResponse'
+ cookies = self._cookies
+ if name in cookies:
+ cookie = cookies[name]
+ else:
+ cookie = cookies[name] = {}
+ if 'value' in cookie:
+ cookie['value'] = '%s:%s' % (cookie['value'], value)
+ else:
+ cookie['value'] = value
+
+
+ def expireCookie(self, name, **kw):
+ 'See IHTTPResponse'
+ dict = {'max_age':0, 'expires':'Wed, 31-Dec-97 23:59:59 GMT'}
+ for k, v in kw.items():
+ if v is not None:
+ dict[k] = v
+ cookies = self._cookies
+ if name in cookies:
+ # Cancel previous setCookie().
+ del cookies[name]
+ self.setCookie(name, 'deleted', **dict)
+
+
+ def setCookie(self, name, value, **kw):
+ 'See IHTTPResponse'
+ cookies = self._cookies
+ cookie = cookies.setdefault(name, {})
+
+ for k, v in kw.items():
+ if v is not None:
+ cookie[k.lower()] = v
+
+ cookie['value'] = value
+
+
+ def getCookie(self, name, default=None):
+ 'See IHTTPResponse'
+ return self._cookies.get(name, default)
+
+
+ def setResult(self, result):
+ r = IResult(result, None)
+ if r is None:
+ if isinstance(result, basestring):
+ body, headers = self._implicitResult(result)
+ r = DirectResult((body,), headers)
+ elif result is None:
+ body, headers = self._implicitResult('')
+ r = DirectResult((body,), headers)
+ else:
+ raise TypeError('The result should be adaptable to IResult.')
+ self._result = r
+ self._headers.update(dict([(k, [v]) for (k, v) in r.headers]))
+ if not self._status_set:
+ self.setStatus(200)
+
+
+ def consumeBody(self):
+ 'See IHTTPResponse'
+ return ''.join(self._result.body)
+
+
+ def consumeBodyIter(self):
+ 'See IHTTPResponse'
+ return self._result.body
+
+
+ # BBB: Backward-compatibility for old body API
+ _body = property(consumeBody)
+ _body = deprecation.deprecated(
+ _body,
+ '`_body` has been deprecated in favor of `consumeBody()`. '
+ 'This will go away in Zope 3.4.')
+
+
+ def _implicitResult(self, body):
+ encoding = getCharsetUsingRequest(self._request) or 'utf-8'
+ content_type = self.getHeader('content-type')
+
+ if isinstance(body, unicode):
+ try:
+ if not content_type.startswith('text/'):
+ raise ValueError(
+ 'Unicode results must have a text content type.')
+ except AttributeError:
+ raise ValueError(
+ 'Unicode results must have a text content type.')
+
+
+ major, minor, params = contenttype.parse(content_type)
+
+ if 'charset' in params:
+ encoding = params['charset']
+ else:
+ content_type += ';charset=%s' %encoding
+
+ body = body.encode(encoding)
+
+ if content_type:
+ headers = [('content-type', content_type),
+ ('content-length', str(len(body)))]
+ else:
+ headers = [('content-length', str(len(body)))]
+
+ return body, headers
+
+
+ def handleException(self, exc_info):
+ """
+ Calls self.setBody() with an error response.
+ """
+ t, v = exc_info[:2]
+ if isinstance(t, ClassType):
+ if issubclass(t, Redirect):
+ self.redirect(v.getLocation())
+ return
+ title = tname = t.__name__
+ else:
+ title = tname = unicode(t)
+
+ # Throwing non-protocol-specific exceptions is a good way
+ # for apps to control the status code.
+ self.setStatus(tname)
+
+ body = self._html(title, "A server error occurred." )
+ self.setResult(body)
+
+
+ def internalError(self):
+ 'See IPublisherResponse'
+ self.setStatus(500, u"The engines can't take any more, Jim!")
+
+
+ def _html(self, title, content):
+ t = escape(title)
+ return (
+ u"<html><head><title>%s</title></head>\n"
+ u"<body><h2>%s</h2>\n"
+ u"%s\n"
+ u"</body></html>\n" %
+ (t, t, content)
+ )
+
+
+ def retry(self):
+ """
+ Returns a response object to be used in a retry attempt
+ """
+ return self.__class__()
+
+
+ def redirect(self, location, status=None):
+ """Causes a redirection without raising an error"""
+ if status is None:
+ # parse the HTTP version and set default accordingly
+ if (self._request.get("SERVER_PROTOCOL","HTTP/1.0") <
+ "HTTP/1.1"):
+ status=302
+ else:
+ status=303
+
+ self.setStatus(status)
+ self.setHeader('Location', location)
+ self.setResult(DirectResult(()))
+ return location
+
+ def _cookie_list(self):
+ try:
+ c = SimpleCookie()
+ except CookieError, e:
+ log = logging.getLogger('eventlog')
+ log.warn(e)
+ return []
+ for name, attrs in self._cookies.items():
+ name = str(name)
+ c[name] = attrs['value'].encode(ENCODING)
+ for k,v in attrs.items():
+ if k == 'value':
+ continue
+ if k == 'secure':
+ if v:
+ c[name]['secure'] = True
+ continue
+ if k == 'max_age':
+ k = 'max-age'
+ elif k == 'comment':
+ # Encode rather than throw an exception
+ v = quote(v.encode('utf-8'), safe="/?:@&+")
+ c[name][k] = str(v)
+ return str(c).splitlines()
+
+
+def sort_charsets(x, y):
+ if y[1] == 'utf-8':
+ return 1
+ if x[1] == 'utf-8':
+ return -1
+ return cmp(y, x)
+
+
+class HTTPCharsets(object):
+ implements(IUserPreferredCharsets)
+
+ def __init__(self, request):
+ self.request = request
+
+ def getPreferredCharsets(self):
+ '''See interface IUserPreferredCharsets'''
+ charsets = []
+ sawstar = sawiso88591 = 0
+ header_present = 'HTTP_ACCEPT_CHARSET' in self.request
+ for charset in self.request.get('HTTP_ACCEPT_CHARSET', '').split(','):
+ charset = charset.strip().lower()
+ if charset:
+ if ';' in charset:
+ charset, quality = charset.split(';')
+ if not quality.startswith('q='):
+ # not a quality parameter
+ quality = 1.0
+ else:
+ try:
+ quality = float(quality[2:])
+ except ValueError:
+ continue
+ else:
+ quality = 1.0
+ if quality == 0.0:
+ continue
+ if charset == '*':
+ sawstar = 1
+ if charset == 'iso-8859-1':
+ sawiso88591 = 1
+ charsets.append((quality, charset))
+ # Quoting RFC 2616, $14.2: If no "*" is present in an Accept-Charset
+ # field, then all character sets not explicitly mentioned get a
+ # quality value of 0, except for ISO-8859-1, which gets a quality
+ # value of 1 if not explicitly mentioned.
+ # And quoting RFC 2616, $14.2: "If no Accept-Charset header is
+ # present, the default is that any character set is acceptable."
+ if not sawstar and not sawiso88591 and header_present:
+ charsets.append((1.0, 'iso-8859-1'))
+ # UTF-8 is **always** preferred over anything else.
+ # Reason: UTF-8 is not specific and can encode the entire unicode
+ # range , unlike many other encodings. Since Zope can easily use very
+ # different ranges, like providing a French-Chinese dictionary, it is
+ # always good to use UTF-8.
+ charsets.sort(sort_charsets)
+ return [c[1] for c in charsets]
+
+
+def getCharsetUsingRequest(request):
+ 'See IHTTPResponse'
+ envadapter = IUserPreferredCharsets(request, None)
+ if envadapter is None:
+ return
+
+ try:
+ charset = envadapter.getPreferredCharsets()[0]
+ except IndexError:
+ # Exception caused by empty list! This is okay though, since the
+ # browser just could have sent a '*', which means we can choose
+ # the encoding, which we do here now.
+ charset = 'utf-8'
+ return charset
+
+
+class DirectResult(object):
+ """A generic result object.
+
+ The result's body can be any iteratable. It is the responsibility of the
+ application to specify all headers related to the content, such as the
+ content type and length.
+ """
+ implements(IResult)
+
+ def __init__(self, body, headers=()):
+ self.body = body
+ self.headers = headers
+
+
+def StrResult(body, headers=()):
+ """A simple string result that represents any type of data.
+
+ It is the responsibility of the application to specify all the headers,
+ including content type and length.
+ """
+ return DirectResult((body,), headers)
diff --git a/interfaces/__init__.py b/interfaces/__init__.py
new file mode 100644
index 0000000..0d18ebc
--- /dev/null
+++ b/interfaces/__init__.py
@@ -0,0 +1,465 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Interfaces for the publisher.
+
+$Id$
+"""
+
+import zope.deprecation
+
+from zope.interface import Interface
+from zope.interface import Attribute
+from zope.security.interfaces import Unauthorized
+from zope.component.interfaces import IPresentationRequest
+from zope.interface import implements
+from zope.interface.interfaces import IInterface
+from zope.interface.common.mapping import IEnumerableMapping
+from zope.interface.common.interfaces import IException
+from zope.security.interfaces import IParticipation
+
+# BBB : can be remove in 3.3
+zope.deprecation.__show__.off()
+from zope.exceptions import NotFoundError, INotFoundError
+zope.deprecation.__show__.on()
+
+class IPublishingException(IException):
+ pass
+
+class PublishingException(Exception):
+ implements(IPublishingException)
+
+class ITraversalException(IPublishingException):
+ pass
+
+class TraversalException(PublishingException):
+ implements(ITraversalException)
+
+class INotFound(INotFoundError, ITraversalException):
+ def getObject():
+ 'Returns the object that was being traversed.'
+
+ def getName():
+ 'Returns the name that was being traversed.'
+
+class NotFound(NotFoundError, TraversalException):
+ implements(INotFound)
+
+ def __init__(self, ob, name, request=None):
+ self.ob = ob
+ self.name = name
+
+ def getObject(self):
+ return self.ob
+
+ def getName(self):
+ return self.name
+
+ def __str__(self):
+ try:
+ ob = `self.ob`
+ except:
+ ob = 'unprintable object'
+ return 'Object: %s, name: %s' % (ob, `self.name`)
+
+class IDebugError(ITraversalException):
+ def getObject():
+ 'Returns the object being traversed.'
+
+ def getMessage():
+ 'Returns the debug message.'
+
+class DebugError(TraversalException):
+ implements(IDebugError)
+
+ def __init__(self, ob, message):
+ self.ob = ob
+ self.message = message
+
+ def getObject(self):
+ return self.ob
+
+ def getMessage(self):
+ return self.message
+
+ def __str__(self):
+ return self.message
+
+class IBadRequest(IPublishingException):
+ def __str__():
+ 'Returns the error message.'
+
+class BadRequest(PublishingException):
+
+ implements(IBadRequest)
+
+ def __init__(self, message):
+ self.message = message
+
+ def __str__(self):
+ return self.message
+
+class IRedirect(IPublishingException):
+ def getLocation():
+ 'Returns the location.'
+
+class Redirect(PublishingException):
+
+ implements(IRedirect)
+
+ def __init__(self, location):
+ self.location = location
+
+ def getLocation(self):
+ return self.location
+
+ def __str__(self):
+ return 'Location: %s' % self.location
+
+class IRetry(IPublishingException):
+ def getOriginalException():
+ 'Returns the original exception object.'
+
+class Retry(PublishingException):
+ """Raise this to retry a request."""
+
+ implements(IRetry)
+
+ def __init__(self, orig_exc=None):
+ self.orig_exc = orig_exc
+
+ def getOriginalException(self):
+ return self.orig_exc
+
+ def __str__(self):
+ return repr(self.orig_exc)
+
+
+class IExceptionSideEffects(Interface):
+ """An exception caught by the publisher is adapted to this so that
+ it can have persistent side-effects."""
+
+ def __call__(obj, request, exc_info):
+ """Effect persistent side-effects.
+
+ Arguments are:
+ obj context-wrapped object that was published
+ request the request
+ exc_info the exception info being handled
+
+ """
+
+
+class IPublishTraverse(Interface):
+
+ def publishTraverse(request, name):
+ """Lookup a name
+
+ The request argument is the publisher request object.
+
+ If a lookup is not possible, raise a NotFound error.
+
+ This method should return an object having the specified name and
+ `self` as parent. The method can use the request to determine the
+ correct object.
+ """
+
+
+class IPublisher(Interface):
+
+ def publish(request):
+ """Publish a request
+
+ The request must be an IPublisherRequest.
+ """
+
+class IResponse(Interface):
+ """Interface used by the publsher"""
+
+ def setResult(result):
+ """Sets the response result value.
+ """
+
+ def handleException(exc_info):
+ """Handles an otherwise unhandled exception.
+
+ The publication object gets the first chance to handle an exception,
+ and if it doesn't have a good way to do it, it defers to the
+ response. Implementations should set the reponse body.
+ """
+
+ def internalError():
+ """Called when the exception handler bombs.
+
+ Should report back to the client that an internal error occurred.
+ """
+
+ def reset():
+ """Reset the output result.
+
+ Reset the response by nullifying already set variables.
+ """
+
+ def retry():
+ """Returns a retry response
+
+ Returns a response suitable for repeating the publication attempt.
+ """
+
+
+class IPublication(Interface):
+ """Object publication framework.
+
+ The responsibility of publication objects is to provide
+ application hooks for the publishing process. This allows
+ application-specific tasks, such as connecting to databases,
+ managing transactions, and setting security contexts to be invoked
+ during the publishing process.
+ """
+ # The order of the hooks mostly corresponds with the order in which
+ # they are invoked.
+
+ def beforeTraversal(request):
+ """Pre-traversal hook.
+
+ This is called *once* before any traversal has been done.
+ """
+
+ def getApplication(request):
+ """Returns the object where traversal should commence.
+ """
+
+ def callTraversalHooks(request, ob):
+ """Invokes any traversal hooks associated with the object.
+
+ This is called before traversing each object. The ob argument
+ is the object that is about to be traversed.
+ """
+
+ def traverseName(request, ob, name):
+ """Traverses to the next object.
+ """
+
+ def afterTraversal(request, ob):
+ """Post-traversal hook.
+
+ This is called after all traversal.
+ """
+
+ def callObject(request, ob):
+ """Call the object, returning the result.
+
+ For GET/POST this means calling it, but for other methods
+ (including those of WebDAV and FTP) this might mean invoking
+ a method of an adapter.
+ """
+
+ def afterCall(request, ob):
+ """Post-callObject hook (if it was successful).
+ """
+
+ def handleException(object, request, exc_info, retry_allowed=1):
+ """Handle an exception
+
+ Either:
+ - sets the body of the response, request.response, or
+ - raises a Retry exception, or
+ - throws another exception, which is a Bad Thing.
+
+ Note that this method should not leak, which means that
+ exc_info must be set to some other value before exiting the method.
+ """
+
+ def endRequest(request, ob):
+ """Do any end-of-request cleanup
+ """
+
+
+class IPublicationRequest(IPresentationRequest, IParticipation):
+ """Interface provided by requests to IPublication objects
+ """
+
+ response = Attribute("""The request's response object
+
+ Return an IPublisherResponse for the request.
+ """)
+
+ def close():
+ """Release resources held by the request.
+ """
+
+ def hold(held):
+ """Hold a reference to an object until the request is closed.
+
+ The object should be an IHeld. If it is an IHeld, it's
+ release method will be called when it is released.
+ """
+
+ def getTraversalStack():
+ """Return the request traversal stack
+
+ This is a sequence of steps to traverse in reverse order. They
+ will be traversed from last to first.
+ """
+
+ def setTraversalStack(stack):
+ """Change the traversal stack.
+
+ See getTraversalStack.
+ """
+
+ def getPositionalArguments():
+ """Return the positional arguments given to the request.
+ """
+
+ def setPrincipal(principal):
+ """Set the principal attribute.
+
+ It should be IPrincipal wrapped in it's AuthenticationService's context.
+ """
+
+class IHeld(Interface):
+ """Object to be held and explicitly released by a request
+ """
+
+ def release():
+ """Release the held object
+
+ This is called by a request that holds the IHeld when the
+ request is closed
+
+ """
+
+class IPublisherRequest(IPublicationRequest):
+ """Request interface use by the publisher
+
+ The responsibility of requests is to encapsulate protocol
+ specific details, especially wrt request inputs.
+
+ Request objects also serve as "context" objects, providing
+ construction of and access to responses and storage of publication
+ objects.
+ """
+
+ def supportsRetry():
+ """Check whether the request supports retry
+
+ Return a boolean value indicating whether the request can be retried.
+ """
+
+ def retry():
+ """Return a retry request
+
+ Return a request suitable for repeating the publication attempt.
+ """
+
+ publication = Attribute("""The request's publication object
+
+ The publication object, an IRequestPublication provides
+ application-specific functionality hooks.
+ """)
+
+ def setPublication(publication):
+ """Set the request's publication object
+ """
+
+ def traverse(object):
+ """Traverse from the given object to the published object
+
+ The published object is returned.
+
+ The following hook methods on the publication will be called:
+
+ - callTraversalHooks is called before each step and after
+ the last step.
+
+ - traverseName to actually do a single traversal
+
+ """
+
+ def processInputs():
+ """Do any input processing that needs to be done before traversing
+
+ This is done after construction to allow the publisher to
+ handle errors that arise.
+ """
+
+
+class IDebugFlags(Interface):
+ """Features that support debugging."""
+
+ sourceAnnotations = Attribute("""Enable ZPT source annotations""")
+ showTAL = Attribute("""Leave TAL markup in rendered page templates""")
+
+
+class IApplicationRequest(IEnumerableMapping):
+ """Features that support application logic
+ """
+
+ principal = Attribute("""Principal object associated with the request
+ This is a read-only attribute.
+ """)
+
+ bodyStream = Attribute(
+ """The stream that provides the data of the request.
+
+ The data returned by the stream will not include any possible header
+ information, which should have been stripped by the server (or
+ previous layer) before.
+
+ Also, the body stream might already be read and not return any
+ data. This is commonly done when retrieving the data for the ``body``
+ attribute.
+
+ If you access this stream directly to retrieve data, it will not be
+ possible by other parts of the framework to access the data of the
+ request via the ``body`` attribute.""")
+
+ debug = Attribute("""Debug flags (see IDebugFlags).""")
+
+ def __getitem__(key):
+ """Return request data
+
+ The only request data are environment variables.
+ """
+
+ environment = Attribute(
+ """Request environment data
+
+ This is a read-only mapping from variable name to value.
+ """)
+
+ annotations = Attribute(
+ """Stores arbitrary application data under package-unique keys.
+
+ By "package-unique keys", we mean keys that are are unique by
+ virtue of including the dotted name of a package as a prefex. A
+ package name is used to limit the authority for picking names for
+ a package to the people using that package.
+
+ For example, when implementing annotations for hypothetical
+ request-persistent adapters in a hypothetical zope.persistentadapter
+ package, the key would be (or at least begin with) the following::
+
+ "zope.persistentadapter"
+ """)
+
+
+class IRequest(IPublisherRequest, IPublicationRequest, IApplicationRequest):
+ """The basic request contract
+ """
+
+
+class ILayer(IInterface):
+ """A grouping of related views for a request."""
+
diff --git a/tests/basetestiapplicationrequest.py b/tests/basetestiapplicationrequest.py
new file mode 100644
index 0000000..fea56c8
--- /dev/null
+++ b/tests/basetestiapplicationrequest.py
@@ -0,0 +1,48 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""IApplicationRequest Base Test
+
+$Id$
+"""
+from zope.interface.verify import verifyObject
+from zope.publisher.interfaces import IApplicationRequest
+
+from zope.interface.common.tests.basemapping import BaseTestIEnumerableMapping
+
+from zope.interface.common.tests.basemapping import testIReadMapping
+
+
+class BaseTestIApplicationRequest(BaseTestIEnumerableMapping):
+ def testVerifyIApplicationRequest(self):
+ verifyObject(IApplicationRequest, self._Test__new())
+
+ def testHaveCustomTestsForIApplicationRequest(self):
+ # Make sure that tests are defined for things we can't test here
+ self.test_IApplicationRequest_bodyStream
+
+ def testEnvironment(self):
+ request = self._Test__new(foo='Foo', bar='Bar')
+
+ try:
+ request.environment = {}
+ except AttributeError:
+ pass
+ else:
+ raise "Shouldn't be able to set environment"
+
+ environment = request.environment
+
+ testIReadMapping(self, environment,
+ {'foo': 'Foo', 'bar': 'Bar'},
+ ['splat'])
diff --git a/tests/test_baserequest.py b/tests/test_baserequest.py
new file mode 100644
index 0000000..9f7ceff
--- /dev/null
+++ b/tests/test_baserequest.py
@@ -0,0 +1,96 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""baserequest tests
+
+$Id$
+"""
+from unittest import TestCase, main, makeSuite
+
+from zope.publisher.tests.basetestipublicationrequest \
+ import BaseTestIPublicationRequest
+
+from zope.publisher.tests.basetestipublisherrequest \
+ import BaseTestIPublisherRequest
+
+from zope.publisher.tests.basetestiapplicationrequest \
+ import BaseTestIApplicationRequest
+
+from StringIO import StringIO
+
+class TestBaseRequest(BaseTestIPublicationRequest,
+ BaseTestIApplicationRequest,
+ BaseTestIPublisherRequest,
+ TestCase):
+
+ def _Test__new(self, **kw):
+ from zope.publisher.base import BaseRequest
+ return BaseRequest(StringIO(''), kw)
+
+ def _Test__expectedViewType(self):
+ return None # we don't expect
+
+ def test_IApplicationRequest_bodyStream(self):
+ from zope.publisher.base import BaseRequest
+
+ request = BaseRequest(StringIO('spam'), {})
+ self.assertEqual(request.bodyStream.read(), 'spam')
+
+ def test_IPublicationRequest_getPositionalArguments(self):
+ self.assertEqual(self._Test__new().getPositionalArguments(), ())
+
+ def test_IPublisherRequest_retry(self):
+ self.assertEqual(self._Test__new().supportsRetry(), 0)
+
+ def test_IPublisherRequest_traverse(self):
+ from zope.publisher.tests.publication import TestPublication
+ request = self._Test__new()
+ request.setPublication(TestPublication())
+ app = request.publication.getApplication(request)
+
+ request.setTraversalStack([])
+ self.assertEqual(request.traverse(app).name, '')
+ self.assertEqual(request._last_obj_traversed, app)
+ request.setTraversalStack(['ZopeCorp'])
+ self.assertEqual(request.traverse(app).name, 'ZopeCorp')
+ self.assertEqual(request._last_obj_traversed, app.ZopeCorp)
+ request.setTraversalStack(['Engineering', 'ZopeCorp'])
+ self.assertEqual(request.traverse(app).name, 'Engineering')
+ self.assertEqual(request._last_obj_traversed, app.ZopeCorp.Engineering)
+
+ def test_IPublisherRequest_processInputs(self):
+ self._Test__new().processInputs()
+
+ def test_AnnotationsExist(self):
+ self.assertEqual(self._Test__new().annotations, {})
+
+ # Needed by BaseTestIEnumerableMapping tests:
+ def _IEnumerableMapping__stateDict(self):
+ return {'id': 'ZopeOrg', 'title': 'Zope Community Web Site',
+ 'greet': 'Welcome to the Zope Community Web site'}
+
+ def _IEnumerableMapping__sample(self):
+ return self._Test__new(**(self._IEnumerableMapping__stateDict()))
+
+ def _IEnumerableMapping__absentKeys(self):
+ return 'foo', 'bar'
+
+ def test_SetRequestInResponse(self):
+ request = self._Test__new()
+ self.assertEqual(request.response._request, request)
+
+def test_suite():
+ return makeSuite(TestBaseRequest)
+
+if __name__=='__main__':
+ main(defaultTest='test_suite')
diff --git a/tests/test_http.py b/tests/test_http.py
new file mode 100644
index 0000000..3573356
--- /dev/null
+++ b/tests/test_http.py
@@ -0,0 +1,633 @@
+# -*- coding: latin-1 -*-
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""HTTP Publisher Tests
+
+$Id$
+"""
+import unittest
+
+from zope.interface import implements
+from zope.publisher.interfaces.logginginfo import ILoggingInfo
+from zope.publisher.http import HTTPRequest, HTTPResponse
+from zope.publisher.http import HTTPInputStream, StrResult
+from zope.publisher.publish import publish
+from zope.publisher.base import DefaultPublication
+from zope.publisher.interfaces.http import IHTTPRequest, IHTTPResponse
+from zope.publisher.interfaces.http import IHTTPApplicationResponse
+from zope.publisher.interfaces import IResponse
+
+from zope.i18n.interfaces.locales import ILocale
+
+from zope.interface.verify import verifyObject
+
+from StringIO import StringIO
+from Cookie import CookieError
+
+
+class UserStub(object):
+ implements(ILoggingInfo)
+
+ def __init__(self, id):
+ self._id = id
+
+ def getId(self):
+ return self._id
+
+ def getLogMessage(self):
+ return self._id
+
+
+data = '''\
+line 1
+line 2
+line 3'''
+
+
+class HTTPInputStreamTests(unittest.TestCase):
+
+ def setUp(self):
+ self.stream = HTTPInputStream(StringIO(data))
+
+ def getCacheStreamValue(self):
+ self.stream.cacheStream.seek(0)
+ return self.stream.cacheStream.read()
+
+ def testRead(self):
+ output = ''
+ self.assertEqual(output, self.getCacheStreamValue())
+ output += self.stream.read(5)
+ self.assertEqual(output, self.getCacheStreamValue())
+ output += self.stream.read()
+ self.assertEqual(output, self.getCacheStreamValue())
+ self.assertEqual(data, self.getCacheStreamValue())
+
+ def testReadLine(self):
+ output = self.stream.readline()
+ self.assertEqual(output, self.getCacheStreamValue())
+ output += self.stream.readline()
+ self.assertEqual(output, self.getCacheStreamValue())
+ output += self.stream.readline()
+ self.assertEqual(output, self.getCacheStreamValue())
+ output += self.stream.readline()
+ self.assertEqual(output, self.getCacheStreamValue())
+ self.assertEqual(data, self.getCacheStreamValue())
+
+ def testReadLines(self):
+ output = ''.join(self.stream.readlines(4))
+ self.assertEqual(output, self.getCacheStreamValue())
+ output += ''.join(self.stream.readlines())
+ self.assertEqual(output, self.getCacheStreamValue())
+ self.assertEqual(data, self.getCacheStreamValue())
+
+ def testGetChacheStream(self):
+ self.stream.read(5)
+ self.assertEqual(data, self.stream.getCacheStream().read())
+
+
+class HTTPTests(unittest.TestCase):
+
+ _testEnv = {
+ 'PATH_INFO': '/folder/item',
+ 'a': '5',
+ 'b': 6,
+ 'SERVER_URL': 'http://foobar.com',
+ 'HTTP_HOST': 'foobar.com',
+ 'CONTENT_LENGTH': '0',
+ 'HTTP_AUTHORIZATION': 'Should be in accessible',
+ 'GATEWAY_INTERFACE': 'TestFooInterface/1.0',
+ 'HTTP_OFF_THE_WALL': "Spam 'n eggs",
+ 'HTTP_ACCEPT_CHARSET': 'ISO-8859-1, UTF-8;q=0.66, UTF-16;q=0.33',
+ }
+
+ def setUp(self):
+ class AppRoot(object):
+ """Required docstring for the publisher."""
+
+ class Folder(object):
+ """Required docstring for the publisher."""
+
+ class Item(object):
+ """Required docstring for the publisher."""
+ def __call__(self, a, b):
+ return "%s, %s" % (`a`, `b`)
+
+ self.app = AppRoot()
+ self.app.folder = Folder()
+ self.app.folder.item = Item()
+ self.app.xxx = Item()
+
+ def _createRequest(self, extra_env={}, body=""):
+ env = self._testEnv.copy()
+ env.update(extra_env)
+ if len(body):
+ env['CONTENT_LENGTH'] = str(len(body))
+
+ publication = DefaultPublication(self.app)
+ instream = StringIO(body)
+ request = HTTPRequest(instream, env)
+ request.setPublication(publication)
+ return request
+
+ def _publisherResults(self, extra_env={}, body=""):
+ request = self._createRequest(extra_env, body)
+ response = request.response
+ publish(request, handle_errors=False)
+ headers = response.getHeaders()
+ headers.sort()
+ return (
+ "Status: %s\r\n" % response.getStatusString()
+ +
+ "\r\n".join([("%s: %s" % h) for h in headers]) + "\r\n\r\n"
+ +
+ ''.join(response.consumeBody())
+ )
+
+ def test_repr(self):
+ request = self._createRequest()
+ expect = '<%s.%s instance URL=http://foobar.com>' % (
+ request.__class__.__module__, request.__class__.__name__)
+ self.assertEqual(repr(request), expect)
+
+ def testTraversalToItem(self):
+ res = self._publisherResults()
+ self.failUnlessEqual(
+ res,
+ "Status: 200 Ok\r\n"
+ "Content-Length: 6\r\n"
+ "X-Powered-By: Zope (www.zope.org), Python (www.python.org)\r\n"
+ "\r\n"
+ "'5', 6")
+
+ def testRedirect(self):
+ # test HTTP/1.0
+ env = {'SERVER_PROTOCOL':'HTTP/1.0'}
+
+ request = self._createRequest(env, '')
+ location = request.response.redirect('http://foobar.com/redirected')
+ self.assertEquals(location, 'http://foobar.com/redirected')
+ self.assertEquals(request.response.getStatus(), 302)
+ self.assertEquals(request.response.getHeader('location'), location)
+
+ # test HTTP/1.1
+ env = {'SERVER_PROTOCOL':'HTTP/1.1'}
+
+ request = self._createRequest(env, '')
+ location = request.response.redirect('http://foobar.com/redirected')
+ self.assertEquals(request.response.getStatus(), 303)
+
+ # test explicit status
+ request = self._createRequest(env, '')
+ request.response.redirect('http://foobar.com/explicit', 304)
+ self.assertEquals(request.response.getStatus(), 304)
+
+ def testRequestEnvironment(self):
+ req = self._createRequest()
+ publish(req, handle_errors=0) # Force expansion of URL variables
+
+ self.assertEquals(str(req.URL), 'http://foobar.com/folder/item')
+ self.assertEquals(req.URL['-1'], 'http://foobar.com/folder')
+ self.assertEquals(req.URL['-2'], 'http://foobar.com')
+ self.assertRaises(KeyError, req.URL.__getitem__, '-3')
+
+ self.assertEquals(req.URL['0'], 'http://foobar.com')
+ self.assertEquals(req.URL['1'], 'http://foobar.com/folder')
+ self.assertEquals(req.URL['2'], 'http://foobar.com/folder/item')
+ self.assertRaises(KeyError, req.URL.__getitem__, '3')
+
+ self.assertEquals(req.URL.get('0'), 'http://foobar.com')
+ self.assertEquals(req.URL.get('1'), 'http://foobar.com/folder')
+ self.assertEquals(req.URL.get('2'), 'http://foobar.com/folder/item')
+ self.assertEquals(req.URL.get('3', 'none'), 'none')
+
+ self.assertEquals(req['SERVER_URL'], 'http://foobar.com')
+ self.assertEquals(req['HTTP_HOST'], 'foobar.com')
+ self.assertEquals(req['PATH_INFO'], '/folder/item')
+ self.assertEquals(req['CONTENT_LENGTH'], '0')
+ self.assertRaises(KeyError, req.__getitem__, 'HTTP_AUTHORIZATION')
+ self.assertEquals(req['GATEWAY_INTERFACE'], 'TestFooInterface/1.0')
+ self.assertEquals(req['HTTP_OFF_THE_WALL'], "Spam 'n eggs")
+
+ self.assertRaises(KeyError, req.__getitem__,
+ 'HTTP_WE_DID_NOT_PROVIDE_THIS')
+
+ def testRequestLocale(self):
+ eq = self.assertEqual
+ unless = self.failUnless
+
+ from zope.component import provideAdapter
+ from zope.publisher.browser import BrowserLanguages
+ from zope.publisher.interfaces.http import IHTTPRequest
+ from zope.i18n.interfaces import IUserPreferredLanguages
+ provideAdapter(BrowserLanguages, [IHTTPRequest],
+ IUserPreferredLanguages)
+
+ for httplang in ('it', 'it-ch', 'it-CH', 'IT', 'IT-CH', 'IT-ch'):
+ req = self._createRequest({'HTTP_ACCEPT_LANGUAGE': httplang})
+ locale = req.locale
+ unless(ILocale.providedBy(locale))
+ parts = httplang.split('-')
+ lang = parts.pop(0).lower()
+ territory = variant = None
+ if parts:
+ territory = parts.pop(0).upper()
+ if parts:
+ variant = parts.pop(0).upper()
+ eq(locale.id.language, lang)
+ eq(locale.id.territory, territory)
+ eq(locale.id.variant, variant)
+ # Now test for non-existant locale fallback
+ req = self._createRequest({'HTTP_ACCEPT_LANGUAGE': 'xx'})
+ locale = req.locale
+ unless(ILocale.providedBy(locale))
+ eq(locale.id.language, None)
+ eq(locale.id.territory, None)
+ eq(locale.id.variant, None)
+
+ # If the first language is not available we should try others
+ req = self._createRequest({'HTTP_ACCEPT_LANGUAGE': 'xx,en;q=0.5'})
+ locale = req.locale
+ unless(ILocale.providedBy(locale))
+ eq(locale.id.language, 'en')
+ eq(locale.id.territory, None)
+ eq(locale.id.variant, None)
+
+ # Regression test: there was a bug where territory and variant were
+ # not reset
+ req = self._createRequest({'HTTP_ACCEPT_LANGUAGE': 'xx-YY,en;q=0.5'})
+ locale = req.locale
+ unless(ILocale.providedBy(locale))
+ eq(locale.id.language, 'en')
+ eq(locale.id.territory, None)
+ eq(locale.id.variant, None)
+
+ from zope.component.testing import tearDown
+ tearDown()
+
+ def testCookies(self):
+ cookies = {
+ 'HTTP_COOKIE':
+ 'foo=bar; path=/; spam="eggs", this="Should be accepted"'
+ }
+ req = self._createRequest(extra_env=cookies)
+
+ self.assertEquals(req.cookies[u'foo'], u'bar')
+ self.assertEquals(req[u'foo'], u'bar')
+
+ self.assertEquals(req.cookies[u'spam'], u'eggs')
+ self.assertEquals(req[u'spam'], u'eggs')
+
+ self.assertEquals(req.cookies[u'this'], u'Should be accepted')
+ self.assertEquals(req[u'this'], u'Should be accepted')
+
+ # Reserved key
+ self.failIf(req.cookies.has_key('path'))
+
+ def testCookieErrorToLog(self):
+ cookies = {
+ 'HTTP_COOKIE':
+ 'foo=bar; path=/; spam="eggs", ldap/OU="Williams"'
+ }
+ req = self._createRequest(extra_env=cookies)
+
+ self.failIf(req.cookies.has_key('foo'))
+ self.failIf(req.has_key('foo'))
+
+ self.failIf(req.cookies.has_key('spam'))
+ self.failIf(req.has_key('spam'))
+
+ self.failIf(req.cookies.has_key('ldap/OU'))
+ self.failIf(req.has_key('ldap/OU'))
+
+ # Reserved key
+ self.failIf(req.cookies.has_key('path'))
+
+ def testCookiesUnicode(self):
+ # Cookie values are assumed to be UTF-8 encoded
+ cookies = {'HTTP_COOKIE': r'key="\342\230\243";'}
+ req = self._createRequest(extra_env=cookies)
+ self.assertEquals(req.cookies[u'key'], u'\N{BIOHAZARD SIGN}')
+
+ def testHeaders(self):
+ headers = {
+ 'TEST_HEADER': 'test',
+ 'Another-Test': 'another',
+ }
+ req = self._createRequest(extra_env=headers)
+ self.assertEquals(req.headers[u'TEST_HEADER'], u'test')
+ self.assertEquals(req.headers[u'TEST-HEADER'], u'test')
+ self.assertEquals(req.headers[u'test_header'], u'test')
+ self.assertEquals(req.getHeader('TEST_HEADER', literal=True), u'test')
+ self.assertEquals(req.getHeader('TEST-HEADER', literal=True), None)
+ self.assertEquals(req.getHeader('test_header', literal=True), None)
+ self.assertEquals(req.getHeader('Another-Test', literal=True),
+ 'another')
+
+ def testBasicAuth(self):
+ from zope.publisher.interfaces.http import IHTTPCredentials
+ import base64
+ req = self._createRequest()
+ verifyObject(IHTTPCredentials, req)
+ lpq = req._authUserPW()
+ self.assertEquals(lpq, None)
+ env = {}
+ login, password = ("tim", "123")
+ s = base64.encodestring("%s:%s" % (login, password)).rstrip()
+ env['HTTP_AUTHORIZATION'] = "Basic %s" % s
+ req = self._createRequest(env)
+ lpw = req._authUserPW()
+ self.assertEquals(lpw, (login, password))
+
+ def testSetPrincipal(self):
+ req = self._createRequest()
+ req.setPrincipal(UserStub("jim"))
+ self.assertEquals(req.response.authUser, 'jim')
+
+ def test_method(self):
+ r = self._createRequest(extra_env={'REQUEST_METHOD':'SPAM'})
+ self.assertEqual(r.method, 'SPAM')
+ r = self._createRequest(extra_env={'REQUEST_METHOD':'eggs'})
+ self.assertEqual(r.method, 'EGGS')
+
+ def test_setApplicationServer(self):
+ req = self._createRequest()
+ req.setApplicationServer('foo')
+ self.assertEquals(req._app_server, 'http://foo')
+ req.setApplicationServer('foo', proto='https')
+ self.assertEquals(req._app_server, 'https://foo')
+ req.setApplicationServer('foo', proto='https', port=8080)
+ self.assertEquals(req._app_server, 'https://foo:8080')
+ req.setApplicationServer('foo', proto='http', port='9673')
+ self.assertEquals(req._app_server, 'http://foo:9673')
+ req.setApplicationServer('foo', proto='https', port=443)
+ self.assertEquals(req._app_server, 'https://foo')
+ req.setApplicationServer('foo', proto='https', port='443')
+ self.assertEquals(req._app_server, 'https://foo')
+ req.setApplicationServer('foo', port=80)
+ self.assertEquals(req._app_server, 'http://foo')
+ req.setApplicationServer('foo', proto='telnet', port=80)
+ self.assertEquals(req._app_server, 'telnet://foo:80')
+
+ def test_setApplicationNames(self):
+ req = self._createRequest()
+ names = ['x', 'y', 'z']
+ req.setVirtualHostRoot(names)
+ self.assertEquals(req._app_names, ['x', 'y', 'z'])
+ names[0] = 'muahahahaha'
+ self.assertEquals(req._app_names, ['x', 'y', 'z'])
+
+ def test_setVirtualHostRoot(self):
+ req = self._createRequest()
+ req._traversed_names = ['x', 'y']
+ req._last_obj_traversed = object()
+ req.setVirtualHostRoot()
+ self.failIf(req._traversed_names)
+ self.assertEquals(req._vh_root, req._last_obj_traversed)
+
+ def test_getVirtualHostRoot(self):
+ req = self._createRequest()
+ self.assertEquals(req.getVirtualHostRoot(), None)
+ req._vh_root = object()
+ self.assertEquals(req.getVirtualHostRoot(), req._vh_root)
+
+ def test_traverse(self):
+ req = self._createRequest()
+ req.traverse(self.app)
+ self.assertEquals(req._traversed_names, ['folder', 'item'])
+
+ # setting it during traversal matters
+ req = self._createRequest()
+ def hook(self, object, req=req, app=self.app):
+ if object is app.folder:
+ req.setVirtualHostRoot()
+ req.publication.callTraversalHooks = hook
+ req.traverse(self.app)
+ self.assertEquals(req._traversed_names, ['item'])
+ self.assertEquals(req._vh_root, self.app.folder)
+
+ def testInterface(self):
+ from zope.publisher.interfaces.http import IHTTPCredentials
+ from zope.publisher.interfaces.http import IHTTPApplicationRequest
+ rq = self._createRequest()
+ verifyObject(IHTTPRequest, rq)
+ verifyObject(IHTTPCredentials, rq)
+ verifyObject(IHTTPApplicationRequest, rq)
+
+ def testDeduceServerURL(self):
+ req = self._createRequest()
+ deduceServerURL = req._HTTPRequest__deduceServerURL
+ req._environ = {'HTTP_HOST': 'example.com:80'}
+ self.assertEquals(deduceServerURL(), 'http://example.com')
+ req._environ = {'HTTP_HOST': 'example.com:8080'}
+ self.assertEquals(deduceServerURL(), 'http://example.com:8080')
+ req._environ = {'HTTP_HOST': 'example.com:443', 'HTTPS': 'on'}
+ self.assertEquals(deduceServerURL(), 'https://example.com')
+ req._environ = {'HTTP_HOST': 'example.com:80', 'HTTPS': 'ON'}
+ self.assertEquals(deduceServerURL(), 'https://example.com:80')
+ req._environ = {'HTTP_HOST': 'example.com:8080',
+ 'SERVER_PORT_SECURE': '1'}
+ self.assertEquals(deduceServerURL(), 'https://example.com:8080')
+ req._environ = {'SERVER_NAME': 'example.com', 'SERVER_PORT':'8080',
+ 'SERVER_PORT_SECURE': '0'}
+ self.assertEquals(deduceServerURL(), 'http://example.com:8080')
+ req._environ = {'SERVER_NAME': 'example.com'}
+ self.assertEquals(deduceServerURL(), 'http://example.com')
+
+ def testUnicodeURLs(self):
+ # The request expects PATH_INFO to be utf-8 encoded when it gets it.
+ req = self._createRequest(
+ {'PATH_INFO': '/\xc3\xa4\xc3\xb6/\xc3\xbc\xc3\x9f/foo/bar.html'})
+ self.assertEqual(req._traversal_stack,
+ [u'bar.html', u'foo', u'üß', u'äö'])
+ # the request should have converted PATH_INFO to unicode
+ self.assertEqual(req['PATH_INFO'], u'/äö/üß/foo/bar.html')
+
+
+class ConcreteHTTPTests(HTTPTests):
+ """Tests that we don't have to worry about subclasses inheriting and
+ breaking.
+ """
+
+ def test_shiftNameToApplication(self):
+ r = self._createRequest()
+ publish(r, handle_errors=0)
+ appurl = r.getApplicationURL()
+
+ # Verify that we can shift. It would be a little more realistic
+ # if we could test this during traversal, but the api doesn't
+ # let us do that.
+ r = self._createRequest(extra_env={"PATH_INFO": "/xxx"})
+ publish(r, handle_errors=0)
+ r.shiftNameToApplication()
+ self.assertEquals(r.getApplicationURL(), appurl+"/xxx")
+
+ # Verify that we can only shift if we've traversed only a single name
+ r = self._createRequest(extra_env={"PATH_INFO": "/folder/item"})
+ publish(r, handle_errors=0)
+ self.assertRaises(ValueError, r.shiftNameToApplication)
+
+
+
+class TestHTTPResponse(unittest.TestCase):
+
+ def testInterface(self):
+ rp = HTTPResponse()
+ verifyObject(IHTTPResponse, rp)
+ verifyObject(IHTTPApplicationResponse, rp)
+ verifyObject(IResponse, rp)
+
+ def _createResponse(self):
+ response = HTTPResponse()
+ return response
+
+ def _parseResult(self, response):
+ return dict(response.getHeaders()), ''.join(response.consumeBody())
+
+ def _getResultFromResponse(self, body, charset='utf-8', headers=None):
+ response = self._createResponse()
+ assert(charset == 'utf-8')
+ if headers is not None:
+ for hdr, val in headers.iteritems():
+ response.setHeader(hdr, val)
+ response.setResult(body)
+ return self._parseResult(response)
+
+ def testWrite_noContentLength(self):
+ response = self._createResponse()
+ # We have to set all the headers ourself, we choose not to provide a
+ # content-length header
+ response.setHeader('Content-Type', 'text/plain;charset=us-ascii')
+
+ # Output the data
+ data = 'a'*10
+ response.setResult(StrResult(data))
+
+ headers, body = self._parseResult(response)
+ # Check that the data have been written, and that the header
+ # has been preserved
+ self.assertEqual(headers['Content-Type'], 'text/plain;charset=us-ascii')
+ self.assertEqual(body, data)
+
+ # Make sure that no Content-Length header was added
+ self.assert_('Content-Length' not in headers)
+
+ def testContentLength(self):
+ eq = self.failUnlessEqual
+
+ headers, body = self._getResultFromResponse("test", "utf-8",
+ {"content-type": "text/plain"})
+ eq("4", headers["Content-Length"])
+ eq("test", body)
+
+ headers, body = self._getResultFromResponse(
+ u'\u0442\u0435\u0441\u0442', "utf-8",
+ {"content-type": "text/plain"})
+ eq("8", headers["Content-Length"])
+ eq('\xd1\x82\xd0\xb5\xd1\x81\xd1\x82', body)
+
+ def testContentType(self):
+ eq = self.failUnlessEqual
+
+ headers, body = self._getResultFromResponse("test", "utf-8")
+ eq("", headers.get("Content-Type", ""))
+ eq("test", body)
+
+ headers, body = self._getResultFromResponse(u"test",
+ headers={"content-type": "text/plain"})
+ eq("text/plain;charset=utf-8", headers["Content-Type"])
+ eq("test", body)
+
+ headers, body = self._getResultFromResponse(u"test", "utf-8",
+ {"content-type": "text/html"})
+ eq("text/html;charset=utf-8", headers["Content-Type"])
+ eq("test", body)
+
+ headers, body = self._getResultFromResponse(u"test", "utf-8",
+ {"content-type": "text/plain;charset=cp1251"})
+ eq("text/plain;charset=cp1251", headers["Content-Type"])
+ eq("test", body)
+
+ headers, body = self._getResultFromResponse("test", "utf-8",
+ {"content-type": "image/gif"})
+ eq("image/gif", headers["Content-Type"])
+ eq("test", body)
+
+ def _getCookieFromResponse(self, cookies):
+ # Shove the cookies through request, parse the Set-Cookie header
+ # and spit out a list of headers for examination
+ response = self._createResponse()
+ for name, value, kw in cookies:
+ response.setCookie(name, value, **kw)
+ response.setResult('test')
+ return [header[1]
+ for header in response.getHeaders()
+ if header[0] == "Set-Cookie"]
+
+ def testSetCookie(self):
+ c = self._getCookieFromResponse([
+ ('foo', 'bar', {}),
+ ])
+ self.failUnless('foo=bar;' in c, 'foo=bar; not in %r' % c)
+
+ c = self._getCookieFromResponse([
+ ('foo', 'bar', {}),
+ ('alpha', 'beta', {}),
+ ])
+ self.failUnless('foo=bar;' in c)
+ self.failUnless('alpha=beta;' in c)
+
+ c = self._getCookieFromResponse([
+ ('sign', u'\N{BIOHAZARD SIGN}', {}),
+ ])
+ self.failUnless(r'sign="\342\230\243";' in c)
+
+ self.assertRaises(
+ CookieError,
+ self._getCookieFromResponse,
+ [('path', 'invalid key', {}),]
+ )
+
+ c = self._getCookieFromResponse([
+ ('foo', 'bar', {
+ 'Expires': 'Sat, 12 Jul 2014 23:26:28 GMT',
+ 'domain': 'example.com',
+ 'pAth': '/froboz',
+ 'max_age': 3600,
+ 'comment': u'blah;\N{BIOHAZARD SIGN}?',
+ 'seCure': True,
+ }),
+ ])[0]
+ self.failUnless('foo=bar;' in c)
+ self.failUnless('expires=Sat, 12 Jul 2014 23:26:28 GMT;' in c, repr(c))
+ self.failUnless('Domain=example.com;' in c)
+ self.failUnless('Path=/froboz;' in c)
+ self.failUnless('Max-Age=3600;' in c)
+ self.failUnless('Comment=blah%3B%E2%98%A3?;' in c, repr(c))
+ self.failUnless('secure;' in c)
+
+ c = self._getCookieFromResponse([('foo', 'bar', {'secure': False})])[0]
+ self.failUnless('foo=bar;' in c)
+ self.failIf('secure' in c)
+
+
+def test_suite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(ConcreteHTTPTests))
+ suite.addTest(unittest.makeSuite(TestHTTPResponse))
+ suite.addTest(unittest.makeSuite(HTTPInputStreamTests))
+ return suite
+
+
+if __name__ == '__main__':
+ unittest.main()