summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorpje <pje@571e12c6-e1fa-0310-aee7-ff1267fa46bd>2006-06-05 17:48:00 +0000
committerpje <pje@571e12c6-e1fa-0310-aee7-ff1267fa46bd>2006-06-05 17:48:00 +0000
commitc59b8b067ce2e70a4a11238ac0bd727ff68254c0 (patch)
treec8ae3c4144a125303ecb1bc26c2d16216cfd37be /src
parent1c9e934df59c3d10d91868a5bad8df37c687569e (diff)
downloadwsgiref-c59b8b067ce2e70a4a11238ac0bd727ff68254c0.tar.gz
Get rid of 'src' subdirectory
git-svn-id: svn://svn.eby-sarna.com/svnroot/wsgiref@2167 571e12c6-e1fa-0310-aee7-ff1267fa46bd
Diffstat (limited to 'src')
-rw-r--r--src/wsgiref/__init__.py23
-rw-r--r--src/wsgiref/handlers.py492
-rw-r--r--src/wsgiref/headers.py205
-rw-r--r--src/wsgiref/simple_server.py205
-rw-r--r--src/wsgiref/tests/__init__.py82
-rw-r--r--src/wsgiref/tests/test_handlers.py246
-rw-r--r--src/wsgiref/tests/test_headers.py123
-rw-r--r--src/wsgiref/tests/test_util.py205
-rw-r--r--src/wsgiref/util.py205
-rwxr-xr-xsrc/wsgiref/validate.py429
10 files changed, 0 insertions, 2215 deletions
diff --git a/src/wsgiref/__init__.py b/src/wsgiref/__init__.py
deleted file mode 100644
index 46c579f..0000000
--- a/src/wsgiref/__init__.py
+++ /dev/null
@@ -1,23 +0,0 @@
-"""wsgiref -- a WSGI (PEP 333) Reference Library
-
-Current Contents:
-
-* util -- Miscellaneous useful functions and wrappers
-
-* headers -- Manage response headers
-
-* handlers -- base classes for server/gateway implementations
-
-* simple_server -- a simple BaseHTTPServer that supports WSGI
-
-* validate -- validation wrapper that sits between an app and a server
- to detect errors in either
-
-To-Do:
-
-* cgi_gateway -- Run WSGI apps under CGI (pending a deployment standard)
-
-* cgi_wrapper -- Run CGI apps under WSGI
-
-* router -- a simple middleware component that handles URL traversal
-"""
diff --git a/src/wsgiref/handlers.py b/src/wsgiref/handlers.py
deleted file mode 100644
index 572cf0d..0000000
--- a/src/wsgiref/handlers.py
+++ /dev/null
@@ -1,492 +0,0 @@
-"""Base classes for server/gateway implementations"""
-
-from types import StringType
-from util import FileWrapper, guess_scheme, is_hop_by_hop
-from headers import Headers
-
-import sys, os, time
-
-try:
- dict
-except NameError:
- def dict(items):
- d = {}
- for k,v in items:
- d[k] = v
- return d
-
-try:
- True
- False
-except NameError:
- True = not None
- False = not True
-
-
-# Weekday and month names for HTTP date/time formatting; always English!
-_weekdayname = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"]
-_monthname = [None, # Dummy so we can use 1-based month numbers
- "Jan", "Feb", "Mar", "Apr", "May", "Jun",
- "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"]
-
-def format_date_time(timestamp):
- year, month, day, hh, mm, ss, wd, y, z = time.gmtime(timestamp)
- return "%s, %02d %3s %4d %02d:%02d:%02d GMT" % (
- _weekdayname[wd], day, _monthname[month], year, hh, mm, ss
- )
-
-
-
-
-
-class BaseHandler:
- """Manage the invocation of a WSGI application"""
-
- # Configuration parameters; can override per-subclass or per-instance
- wsgi_version = (1,0)
- wsgi_multithread = True
- wsgi_multiprocess = True
- wsgi_run_once = False
-
- origin_server = True # We are transmitting direct to client
- http_version = "1.0" # Version that should be used for response
- server_software = None # String name of server software, if any
-
- # os_environ is used to supply configuration from the OS environment:
- # by default it's a copy of 'os.environ' as of import time, but you can
- # override this in e.g. your __init__ method.
- os_environ = dict(os.environ.items())
-
- # Collaborator classes
- wsgi_file_wrapper = FileWrapper # set to None to disable
- headers_class = Headers # must be a Headers-like class
-
- # Error handling (also per-subclass or per-instance)
- traceback_limit = None # Print entire traceback to self.get_stderr()
- error_status = "500 Dude, this is whack!"
- error_headers = [('Content-Type','text/plain')]
- error_body = "A server error occurred. Please contact the administrator."
-
- # State variables (don't mess with these)
- status = result = None
- headers_sent = False
- headers = None
- bytes_sent = 0
-
-
-
-
-
-
-
-
- def run(self, application):
- """Invoke the application"""
- # Note to self: don't move the close()! Asynchronous servers shouldn't
- # call close() from finish_response(), so if you close() anywhere but
- # the double-error branch here, you'll break asynchronous servers by
- # prematurely closing. Async servers must return from 'run()' without
- # closing if there might still be output to iterate over.
- try:
- self.setup_environ()
- self.result = application(self.environ, self.start_response)
- self.finish_response()
- except:
- try:
- self.handle_error()
- except:
- # If we get an error handling an error, just give up already!
- self.close()
- raise # ...and let the actual server figure it out.
-
-
- def setup_environ(self):
- """Set up the environment for one request"""
-
- env = self.environ = self.os_environ.copy()
- self.add_cgi_vars()
-
- env['wsgi.input'] = self.get_stdin()
- env['wsgi.errors'] = self.get_stderr()
- env['wsgi.version'] = self.wsgi_version
- env['wsgi.run_once'] = self.wsgi_run_once
- env['wsgi.url_scheme'] = self.get_scheme()
- env['wsgi.multithread'] = self.wsgi_multithread
- env['wsgi.multiprocess'] = self.wsgi_multiprocess
-
- if self.wsgi_file_wrapper is not None:
- env['wsgi.file_wrapper'] = self.wsgi_file_wrapper
-
- if self.origin_server and self.server_software:
- env.setdefault('SERVER_SOFTWARE',self.server_software)
-
-
- def finish_response(self):
- """Send any iterable data, then close self and the iterable
-
- Subclasses intended for use in asynchronous servers will
- want to redefine this method, such that it sets up callbacks
- in the event loop to iterate over the data, and to call
- 'self.close()' once the response is finished.
- """
- if not self.result_is_file() or not self.sendfile():
- for data in self.result:
- self.write(data)
- self.finish_content()
- self.close()
-
-
- def get_scheme(self):
- """Return the URL scheme being used"""
- return guess_scheme(self.environ)
-
-
- def set_content_length(self):
- """Compute Content-Length or switch to chunked encoding if possible"""
- try:
- blocks = len(self.result)
- except (TypeError,AttributeError,NotImplementedError):
- pass
- else:
- if blocks==1:
- self.headers['Content-Length'] = str(self.bytes_sent)
- return
- # XXX Try for chunked encoding if origin server and client is 1.1
-
-
- def cleanup_headers(self):
- """Make any necessary header changes or defaults
-
- Subclasses can extend this to add other defaults.
- """
- if not self.headers.has_key('Content-Length'):
- self.set_content_length()
-
- def start_response(self, status, headers,exc_info=None):
- """'start_response()' callable as specified by PEP 333"""
-
- if exc_info:
- try:
- if self.headers_sent:
- # Re-raise original exception if headers sent
- raise exc_info[0], exc_info[1], exc_info[2]
- finally:
- exc_info = None # avoid dangling circular ref
- elif self.headers is not None:
- raise AssertionError("Headers already set!")
-
- assert type(status) is StringType,"Status must be a string"
- assert len(status)>=4,"Status must be at least 4 characters"
- assert int(status[:3]),"Status message must begin w/3-digit code"
- assert status[3]==" ", "Status message must have a space after code"
- if __debug__:
- for name,val in headers:
- assert type(name) is StringType,"Header names must be strings"
- assert type(val) is StringType,"Header values must be strings"
- assert not is_hop_by_hop(name),"Hop-by-hop headers not allowed"
- self.status = status
- self.headers = self.headers_class(headers)
- return self.write
-
-
- def send_preamble(self):
- """Transmit version/status/date/server, via self._write()"""
- if self.origin_server:
- if self.client_is_modern():
- self._write('HTTP/%s %s\r\n' % (self.http_version,self.status))
- if not self.headers.has_key('Date'):
- self._write(
- 'Date: %s\r\n' % format_date_time(time.time())
- )
- if self.server_software and not self.headers.has_key('Server'):
- self._write('Server: %s\r\n' % self.server_software)
- else:
- self._write('Status: %s\r\n' % self.status)
-
- def write(self, data):
- """'write()' callable as specified by PEP 333"""
-
- assert type(data) is StringType,"write() argument must be string"
-
- if not self.status:
- raise AssertionError("write() before start_response()")
-
- elif not self.headers_sent:
- # Before the first output, send the stored headers
- self.bytes_sent = len(data) # make sure we know content-length
- self.send_headers()
- else:
- self.bytes_sent += len(data)
-
- # XXX check Content-Length and truncate if too many bytes written?
- self._write(data)
- self._flush()
-
-
- def sendfile(self):
- """Platform-specific file transmission
-
- Override this method in subclasses to support platform-specific
- file transmission. It is only called if the application's
- return iterable ('self.result') is an instance of
- 'self.wsgi_file_wrapper'.
-
- This method should return a true value if it was able to actually
- transmit the wrapped file-like object using a platform-specific
- approach. It should return a false value if normal iteration
- should be used instead. An exception can be raised to indicate
- that transmission was attempted, but failed.
-
- NOTE: this method should call 'self.send_headers()' if
- 'self.headers_sent' is false and it is going to attempt direct
- transmission of the file1.
- """
- return False # No platform-specific transmission by default
-
-
- def finish_content(self):
- """Ensure headers and content have both been sent"""
- if not self.headers_sent:
- self.headers['Content-Length'] = "0"
- self.send_headers()
- else:
- pass # XXX check if content-length was too short?
-
- def close(self):
- """Close the iterable (if needed) and reset all instance vars
-
- Subclasses may want to also drop the client connection.
- """
- try:
- if hasattr(self.result,'close'):
- self.result.close()
- finally:
- self.result = self.headers = self.status = self.environ = None
- self.bytes_sent = 0; self.headers_sent = False
-
-
- def send_headers(self):
- """Transmit headers to the client, via self._write()"""
- self.cleanup_headers()
- self.headers_sent = True
- if not self.origin_server or self.client_is_modern():
- self.send_preamble()
- self._write(str(self.headers))
-
-
- def result_is_file(self):
- """True if 'self.result' is an instance of 'self.wsgi_file_wrapper'"""
- wrapper = self.wsgi_file_wrapper
- return wrapper is not None and isinstance(self.result,wrapper)
-
-
- def client_is_modern(self):
- """True if client can accept status and headers"""
- return self.environ['SERVER_PROTOCOL'].upper() != 'HTTP/0.9'
-
-
- def log_exception(self,exc_info):
- """Log the 'exc_info' tuple in the server log
-
- Subclasses may override to retarget the output or change its format.
- """
- try:
- from traceback import print_exception
- stderr = self.get_stderr()
- print_exception(
- exc_info[0], exc_info[1], exc_info[2],
- self.traceback_limit, stderr
- )
- stderr.flush()
- finally:
- exc_info = None
-
- def handle_error(self):
- """Log current error, and send error output to client if possible"""
- self.log_exception(sys.exc_info())
- if not self.headers_sent:
- self.result = self.error_output(self.environ, self.start_response)
- self.finish_response()
- # XXX else: attempt advanced recovery techniques for HTML or text?
-
- def error_output(self, environ, start_response):
- """WSGI mini-app to create error output
-
- By default, this just uses the 'error_status', 'error_headers',
- and 'error_body' attributes to generate an output page. It can
- be overridden in a subclass to dynamically generate diagnostics,
- choose an appropriate message for the user's preferred language, etc.
-
- Note, however, that it's not recommended from a security perspective to
- spit out diagnostics to any old user; ideally, you should have to do
- something special to enable diagnostic output, which is why we don't
- include any here!
- """
- start_response(self.error_status,self.error_headers[:],sys.exc_info())
- return [self.error_body]
-
-
- # Pure abstract methods; *must* be overridden in subclasses
-
- def _write(self,data):
- """Override in subclass to buffer data for send to client
-
- It's okay if this method actually transmits the data; BaseHandler
- just separates write and flush operations for greater efficiency
- when the underlying system actually has such a distinction.
- """
- raise NotImplementedError
-
- def _flush(self):
- """Override in subclass to force sending of recent '_write()' calls
-
- It's okay if this method is a no-op (i.e., if '_write()' actually
- sends the data.
- """
- raise NotImplementedError
-
- def get_stdin(self):
- """Override in subclass to return suitable 'wsgi.input'"""
- raise NotImplementedError
-
- def get_stderr(self):
- """Override in subclass to return suitable 'wsgi.errors'"""
- raise NotImplementedError
-
- def add_cgi_vars(self):
- """Override in subclass to insert CGI variables in 'self.environ'"""
- raise NotImplementedError
-
-
-
-
-
-
-
-
-
-
-
-class SimpleHandler(BaseHandler):
- """Handler that's just initialized with streams, environment, etc.
-
- This handler subclass is intended for synchronous HTTP/1.0 origin servers,
- and handles sending the entire response output, given the correct inputs.
-
- Usage::
-
- handler = BaseCGIHandler(
- inp,out,err,env, multithread=False, multiprocess=True
- )
- handler.run(app)"""
-
- def __init__(self,stdin,stdout,stderr,environ,
- multithread=True, multiprocess=False
- ):
- self.stdin = stdin
- self.stdout = stdout
- self.stderr = stderr
- self.base_env = environ
- self.wsgi_multithread = multithread
- self.wsgi_multiprocess = multiprocess
-
- def get_stdin(self):
- return self.stdin
-
- def get_stderr(self):
- return self.stderr
-
- def add_cgi_vars(self):
- self.environ.update(self.base_env)
-
- def _write(self,data):
- self.stdout.write(data)
- self._write = self.stdout.write
-
- def _flush(self):
- self.stdout.flush()
- self._flush = self.stdout.flush
-
-
-class BaseCGIHandler(SimpleHandler):
-
- """CGI-like systems using input/output/error streams and environ mapping
-
- Usage::
-
- handler = BaseCGIHandler(inp,out,err,env)
- handler.run(app)
-
- This handler class is useful for gateway protocols like ReadyExec and
- FastCGI, that have usable input/output/error streams and an environment
- mapping. It's also the base class for CGIHandler, which just uses
- sys.stdin, os.environ, and so on.
-
- The constructor also takes keyword arguments 'multithread' and
- 'multiprocess' (defaulting to 'True' and 'False' respectively) to control
- the configuration sent to the application. It sets 'origin_server' to
- False (to enable CGI-like output), and assumes that 'wsgi.run_once' is
- False.
- """
-
- origin_server = False
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-class CGIHandler(BaseCGIHandler):
-
- """CGI-based invocation via sys.stdin/stdout/stderr and os.environ
-
- Usage::
-
- CGIHandler().run(app)
-
- The difference between this class and BaseCGIHandler is that it always
- uses 'wsgi.run_once' of 'True', 'wsgi.multithread' of 'False', and
- 'wsgi.multiprocess' of 'True'. It does not take any initialization
- parameters, but always uses 'sys.stdin', 'os.environ', and friends.
-
- If you need to override any of these parameters, use BaseCGIHandler
- instead.
- """
-
- wsgi_run_once = True
-
- def __init__(self):
- BaseCGIHandler.__init__(
- self, sys.stdin, sys.stdout, sys.stderr, dict(os.environ.items()),
- multithread=False, multiprocess=True
- )
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/src/wsgiref/headers.py b/src/wsgiref/headers.py
deleted file mode 100644
index fa9b829..0000000
--- a/src/wsgiref/headers.py
+++ /dev/null
@@ -1,205 +0,0 @@
-"""Manage HTTP Response Headers
-
-Much of this module is red-handedly pilfered from email.Message in the stdlib,
-so portions are Copyright (C) 2001,2002 Python Software Foundation, and were
-written by Barry Warsaw.
-"""
-
-from types import ListType, TupleType
-
-# Regular expression that matches `special' characters in parameters, the
-# existance of which force quoting of the parameter value.
-import re
-tspecials = re.compile(r'[ \(\)<>@,;:\\"/\[\]\?=]')
-
-def _formatparam(param, value=None, quote=1):
- """Convenience function to format and return a key=value pair.
-
- This will quote the value if needed or if quote is true.
- """
- if value is not None and len(value) > 0:
- if quote or tspecials.search(value):
- value = value.replace('\\', '\\\\').replace('"', r'\"')
- return '%s="%s"' % (param, value)
- else:
- return '%s=%s' % (param, value)
- else:
- return param
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-class Headers:
-
- """Manage a collection of HTTP response headers"""
-
- def __init__(self,headers):
- if type(headers) is not ListType:
- raise TypeError("Headers must be a list of name/value tuples")
- self._headers = headers
-
- def __len__(self):
- """Return the total number of headers, including duplicates."""
- return len(self._headers)
-
- def __setitem__(self, name, val):
- """Set the value of a header."""
- del self[name]
- self._headers.append((name, val))
-
- def __delitem__(self,name):
- """Delete all occurrences of a header, if present.
-
- Does *not* raise an exception if the header is missing.
- """
- name = name.lower()
- self._headers[:] = [kv for kv in self._headers if kv[0].lower()<>name]
-
- def __getitem__(self,name):
- """Get the first header value for 'name'
-
- Return None if the header is missing instead of raising an exception.
-
- Note that if the header appeared multiple times, the first exactly which
- occurrance gets returned is undefined. Use getall() to get all
- the values matching a header field name.
- """
- return self.get(name)
-
-
-
-
-
- def has_key(self, name):
- """Return true if the message contains the header."""
- return self.get(name) is not None
-
- __contains__ = has_key
-
-
- def get_all(self, name):
- """Return a list of all the values for the named field.
-
- These will be sorted in the order they appeared in the original header
- list or were added to this instance, and may contain duplicates. Any
- fields deleted and re-inserted are always appended to the header list.
- If no fields exist with the given name, returns an empty list.
- """
- name = name.lower()
- return [kv[1] for kv in self._headers if kv[0].lower()==name]
-
-
- def get(self,name,default=None):
- """Get the first header value for 'name', or return 'default'"""
- name = name.lower()
- for k,v in self._headers:
- if k.lower()==name:
- return v
- return default
-
-
- def keys(self):
- """Return a list of all the header field names.
-
- These will be sorted in the order they appeared in the original header
- list, or were added to this instance, and may contain duplicates.
- Any fields deleted and re-inserted are always appended to the header
- list.
- """
- return [k for k, v in self._headers]
-
-
-
-
- def values(self):
- """Return a list of all header values.
-
- These will be sorted in the order they appeared in the original header
- list, or were added to this instance, and may contain duplicates.
- Any fields deleted and re-inserted are always appended to the header
- list.
- """
- return [v for k, v in self._headers]
-
- def items(self):
- """Get all the header fields and values.
-
- These will be sorted in the order they were in the original header
- list, or were added to this instance, and may contain duplicates.
- Any fields deleted and re-inserted are always appended to the header
- list.
- """
- return self._headers[:]
-
- def __repr__(self):
- return "Headers(%s)" % `self._headers`
-
- def __str__(self):
- """str() returns the formatted headers, complete with end line,
- suitable for direct HTTP transmission."""
- return '\r\n'.join(["%s: %s" % kv for kv in self._headers]+['',''])
-
- def setdefault(self,name,value):
- """Return first matching header value for 'name', or 'value'
-
- If there is no header named 'name', add a new header with name 'name'
- and value 'value'."""
- result = self.get(name)
- if result is None:
- self._headers.append((name,value))
- return value
- else:
- return result
-
-
- def add_header(self, _name, _value, **_params):
- """Extended header setting.
-
- _name is the header field to add. keyword arguments can be used to set
- additional parameters for the header field, with underscores converted
- to dashes. Normally the parameter will be added as key="value" unless
- value is None, in which case only the key will be added.
-
- Example:
-
- h.add_header('content-disposition', 'attachment', filename='bud.gif')
-
- Note that unlike the corresponding 'email.Message' method, this does
- *not* handle '(charset, language, value)' tuples: all values must be
- strings or None.
- """
- parts = []
- if _value is not None:
- parts.append(_value)
- for k, v in _params.items():
- if v is None:
- parts.append(k.replace('_', '-'))
- else:
- parts.append(_formatparam(k.replace('_', '-'), v))
- self._headers.append((_name, "; ".join(parts)))
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/src/wsgiref/simple_server.py b/src/wsgiref/simple_server.py
deleted file mode 100644
index d226359..0000000
--- a/src/wsgiref/simple_server.py
+++ /dev/null
@@ -1,205 +0,0 @@
-"""BaseHTTPServer that implements the Python WSGI protocol (PEP 333, rev 1.21)
-
-This is both an example of how WSGI can be implemented, and a basis for running
-simple web applications on a local machine, such as might be done when testing
-or debugging an application. It has not been reviewed for security issues,
-however, and we strongly recommend that you use a "real" web server for
-production use.
-
-For example usage, see the 'if __name__=="__main__"' block at the end of the
-module. See also the BaseHTTPServer module docs for other API information.
-"""
-
-from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
-import urllib, sys
-from wsgiref.handlers import SimpleHandler
-
-__version__ = "0.1"
-__all__ = ['WSGIServer','WSGIRequestHandler','demo_app']
-
-
-server_version = "WSGIServer/" + __version__
-sys_version = "Python/" + sys.version.split()[0]
-software_version = server_version + ' ' + sys_version
-
-
-class ServerHandler(SimpleHandler):
-
- server_software = software_version
-
- def close(self):
- try:
- self.request_handler.log_request(
- self.status.split(' ',1)[0], self.bytes_sent
- )
- finally:
- SimpleHandler.close(self)
-
-
-
-
-
-class WSGIServer(HTTPServer):
-
- """BaseHTTPServer that implements the Python WSGI protocol"""
-
- application = None
-
- def server_bind(self):
- """Override server_bind to store the server name."""
- HTTPServer.server_bind(self)
- self.setup_environ()
-
- def setup_environ(self):
- # Set up base environment
- env = self.base_environ = {}
- env['SERVER_NAME'] = self.server_name
- env['GATEWAY_INTERFACE'] = 'CGI/1.1'
- env['SERVER_PORT'] = str(self.server_port)
- env['REMOTE_HOST']=''
- env['CONTENT_LENGTH']=''
- env['SCRIPT_NAME'] = ''
-
- def get_app(self):
- return self.application
-
- def set_app(self,application):
- self.application = application
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-class WSGIRequestHandler(BaseHTTPRequestHandler):
-
- server_version = "WSGIServer/" + __version__
-
- def get_environ(self):
- env = self.server.base_environ.copy()
- env['SERVER_PROTOCOL'] = self.request_version
- env['REQUEST_METHOD'] = self.command
- if '?' in self.path:
- path,query = self.path.split('?',1)
- else:
- path,query = self.path,''
-
- env['PATH_INFO'] = urllib.unquote(path)
- env['QUERY_STRING'] = query
-
- host = self.address_string()
- if host != self.client_address[0]:
- env['REMOTE_HOST'] = host
- env['REMOTE_ADDR'] = self.client_address[0]
-
- if self.headers.typeheader is None:
- env['CONTENT_TYPE'] = self.headers.type
- else:
- env['CONTENT_TYPE'] = self.headers.typeheader
-
- length = self.headers.getheader('content-length')
- if length:
- env['CONTENT_LENGTH'] = length
-
- for h in self.headers.headers:
- k,v = h.split(':',1)
- k=k.replace('-','_').upper(); v=v.strip()
- if k in env:
- continue # skip content length, type,etc.
- if 'HTTP_'+k in env:
- env['HTTP_'+k] += ','+v # comma-separate multiple headers
- else:
- env['HTTP_'+k] = v
- return env
-
- def get_stderr(self):
- return sys.stderr
-
- def handle(self):
- """Handle a single HTTP request"""
-
- self.raw_requestline = self.rfile.readline()
- if not self.parse_request(): # An error code has been sent, just exit
- return
-
- handler = ServerHandler(
- self.rfile, self.wfile, self.get_stderr(), self.get_environ()
- )
- handler.request_handler = self # backpointer for logging
- handler.run(self.server.get_app())
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-def demo_app(environ,start_response):
- from StringIO import StringIO
- stdout = StringIO()
- print >>stdout, "Hello world!"
- print >>stdout
- h = environ.items(); h.sort()
- for k,v in h:
- print >>stdout, k,'=',`v`
- start_response("200 OK", [('Content-Type','text/plain')])
- return [stdout.getvalue()]
-
-
-if __name__ == '__main__':
- server_address = ('', 8000)
- httpd = WSGIServer(server_address, WSGIRequestHandler)
- httpd.set_app(demo_app)
- sa = httpd.socket.getsockname()
- print "Serving HTTP on", sa[0], "port", sa[1], "..."
- import webbrowser
- webbrowser.open('http://localhost:8000/xyz?abc')
- httpd.handle_request() # serve one request, then exit
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/src/wsgiref/tests/__init__.py b/src/wsgiref/tests/__init__.py
deleted file mode 100644
index 6b482a0..0000000
--- a/src/wsgiref/tests/__init__.py
+++ /dev/null
@@ -1,82 +0,0 @@
-from unittest import TestSuite, TestCase, makeSuite
-
-def compare_generic_iter(make_it,match):
- """Utility to compare a generic 2.1/2.2+ iterator with an iterable
-
- If running under Python 2.2+, this tests the iterator using iter()/next(),
- as well as __getitem__. 'make_it' must be a function returning a fresh
- iterator to be tested (since this may test the iterator twice)."""
-
- it = make_it()
- n = 0
- for item in match:
- assert it[n]==item
- n+=1
- try:
- it[n]
- except IndexError:
- pass
- else:
- raise AssertionError("Too many items from __getitem__",it)
-
- try:
- iter, StopIteration
- except NameError:
- pass
- else:
- # Only test iter mode under 2.2+
- it = make_it()
- assert iter(it) is it
- for item in match:
- assert it.next()==item
- try:
- it.next()
- except StopIteration:
- pass
- else:
- raise AssertionError("Too many items from .next()",it)
-
-
-
-
-def test_suite():
-
- from wsgiref.tests import test_util
- from wsgiref.tests import test_headers
- from wsgiref.tests import test_handlers
-
- tests = [
- test_util.test_suite(),
- test_headers.test_suite(),
- test_handlers.test_suite(),
- ]
-
- return TestSuite(tests)
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/src/wsgiref/tests/test_handlers.py b/src/wsgiref/tests/test_handlers.py
deleted file mode 100644
index 80cbb93..0000000
--- a/src/wsgiref/tests/test_handlers.py
+++ /dev/null
@@ -1,246 +0,0 @@
-from __future__ import nested_scopes # Backward compat for 2.1
-from unittest import TestCase, TestSuite, makeSuite
-from wsgiref.util import setup_testing_defaults
-from wsgiref.headers import Headers
-from wsgiref.handlers import BaseHandler, BaseCGIHandler
-from StringIO import StringIO
-import re
-
-class ErrorHandler(BaseCGIHandler):
- """Simple handler subclass for testing BaseHandler"""
-
- def __init__(self,**kw):
- setup_testing_defaults(kw)
- BaseCGIHandler.__init__(
- self, StringIO(''), StringIO(), StringIO(), kw,
- multithread=True, multiprocess=True
- )
-
-class TestHandler(ErrorHandler):
- """Simple handler subclass for testing BaseHandler, w/error passthru"""
-
- def handle_error(self):
- raise # for testing, we want to see what's happening
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-class HandlerTests(TestCase):
-
- def checkEnvironAttrs(self, handler):
- env = handler.environ
- for attr in [
- 'version','multithread','multiprocess','run_once','file_wrapper'
- ]:
- if attr=='file_wrapper' and handler.wsgi_file_wrapper is None:
- continue
- self.assertEqual(getattr(handler,'wsgi_'+attr),env['wsgi.'+attr])
-
- def checkOSEnviron(self,handler):
- empty = {}; setup_testing_defaults(empty)
- env = handler.environ
- from os import environ
- for k,v in environ.items():
- if not empty.has_key(k):
- self.assertEqual(env[k],v)
- for k,v in empty.items():
- self.failUnless(env.has_key(k))
-
- def testEnviron(self):
- h = TestHandler(X="Y")
- h.setup_environ()
- self.checkEnvironAttrs(h)
- self.checkOSEnviron(h)
- self.assertEqual(h.environ["X"],"Y")
-
- def testCGIEnviron(self):
- h = BaseCGIHandler(None,None,None,{})
- h.setup_environ()
- for key in 'wsgi.url_scheme', 'wsgi.input', 'wsgi.errors':
- assert h.environ.has_key(key)
-
- def testScheme(self):
- h=TestHandler(HTTPS="on"); h.setup_environ()
- self.assertEqual(h.environ['wsgi.url_scheme'],'https')
- h=TestHandler(); h.setup_environ()
- self.assertEqual(h.environ['wsgi.url_scheme'],'http')
-
-
- def testAbstractMethods(self):
- h = BaseHandler()
- for name in [
- '_flush','get_stdin','get_stderr','add_cgi_vars'
- ]:
- self.assertRaises(NotImplementedError, getattr(h,name))
- self.assertRaises(NotImplementedError, h._write, "test")
-
-
- def testContentLength(self):
- # Demo one reason iteration is better than write()... ;)
-
- def trivial_app1(e,s):
- s('200 OK',[])
- return [e['wsgi.url_scheme']]
-
- def trivial_app2(e,s):
- s('200 OK',[])(e['wsgi.url_scheme'])
- return []
-
- h = TestHandler()
- h.run(trivial_app1)
- self.assertEqual(h.stdout.getvalue(),
- "Status: 200 OK\r\n"
- "Content-Length: 4\r\n"
- "\r\n"
- "http")
-
- h = TestHandler()
- h.run(trivial_app2)
- self.assertEqual(h.stdout.getvalue(),
- "Status: 200 OK\r\n"
- "\r\n"
- "http")
-
-
-
-
-
-
-
- def testBasicErrorOutput(self):
-
- def non_error_app(e,s):
- s('200 OK',[])
- return []
-
- def error_app(e,s):
- raise AssertionError("This should be caught by handler")
-
- h = ErrorHandler()
- h.run(non_error_app)
- self.assertEqual(h.stdout.getvalue(),
- "Status: 200 OK\r\n"
- "Content-Length: 0\r\n"
- "\r\n")
- self.assertEqual(h.stderr.getvalue(),"")
-
- h = ErrorHandler()
- h.run(error_app)
- self.assertEqual(h.stdout.getvalue(),
- "Status: %s\r\n"
- "Content-Type: text/plain\r\n"
- "Content-Length: %d\r\n"
- "\r\n%s" % (h.error_status,len(h.error_body),h.error_body))
-
- self.failUnless(h.stderr.getvalue().find("AssertionError")<>-1)
-
- def testErrorAfterOutput(self):
- MSG = "Some output has been sent"
- def error_app(e,s):
- s("200 OK",[])(MSG)
- raise AssertionError("This should be caught by handler")
-
- h = ErrorHandler()
- h.run(error_app)
- self.assertEqual(h.stdout.getvalue(),
- "Status: 200 OK\r\n"
- "\r\n"+MSG)
- self.failUnless(h.stderr.getvalue().find("AssertionError")<>-1)
-
-
- def testHeaderFormats(self):
-
- def non_error_app(e,s):
- s('200 OK',[])
- return []
-
- stdpat = (
- r"HTTP/%s 200 OK\r\n"
- r"Date: \w{3}, [ 0123]\d \w{3} \d{4} \d\d:\d\d:\d\d GMT\r\n"
- r"%s" r"Content-Length: 0\r\n" r"\r\n"
- )
- shortpat = (
- "Status: 200 OK\r\n" "Content-Length: 0\r\n" "\r\n"
- )
-
- for ssw in "FooBar/1.0", None:
- sw = ssw and "Server: %s\r\n" % ssw or ""
-
- for version in "1.0", "1.1":
- for proto in "HTTP/0.9", "HTTP/1.0", "HTTP/1.1":
-
- h = TestHandler(SERVER_PROTOCOL=proto)
- h.origin_server = False
- h.http_version = version
- h.server_software = ssw
- h.run(non_error_app)
- self.assertEqual(shortpat,h.stdout.getvalue())
-
- h = TestHandler(SERVER_PROTOCOL=proto)
- h.origin_server = True
- h.http_version = version
- h.server_software = ssw
- h.run(non_error_app)
- if proto=="HTTP/0.9":
- self.assertEqual(h.stdout.getvalue(),"")
- else:
- self.failUnless(
- re.match(stdpat%(version,sw), h.stdout.getvalue()),
- (stdpat%(version,sw), h.stdout.getvalue())
- )
-
-TestClasses = (
- HandlerTests,
-)
-
-def test_suite():
- return TestSuite([makeSuite(t,'test') for t in TestClasses])
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/src/wsgiref/tests/test_headers.py b/src/wsgiref/tests/test_headers.py
deleted file mode 100644
index db9afc3..0000000
--- a/src/wsgiref/tests/test_headers.py
+++ /dev/null
@@ -1,123 +0,0 @@
-from unittest import TestCase, TestSuite, makeSuite
-from wsgiref.headers import Headers
-from wsgiref.tests import compare_generic_iter
-
-class HeaderTests(TestCase):
-
- def testMappingInterface(self):
- test = [('x','y')]
- self.assertEqual(len(Headers([])),0)
- self.assertEqual(len(Headers(test[:])),1)
- self.assertEqual(Headers(test[:]).keys(), ['x'])
- self.assertEqual(Headers(test[:]).values(), ['y'])
- self.assertEqual(Headers(test[:]).items(), test)
- self.failIf(Headers(test).items() is test) # must be copy!
-
- h=Headers([])
- del h['foo'] # should not raise an error
-
- h['Foo'] = 'bar'
- for m in h.has_key, h.__contains__, h.get, h.get_all, h.__getitem__:
- self.failUnless(m('foo'))
- self.failUnless(m('Foo'))
- self.failUnless(m('FOO'))
- self.failIf(m('bar'))
-
- self.assertEqual(h['foo'],'bar')
- h['foo'] = 'baz'
- self.assertEqual(h['FOO'],'baz')
- self.assertEqual(h.get_all('foo'),['baz'])
-
- self.assertEqual(h.get("foo","whee"), "baz")
- self.assertEqual(h.get("zoo","whee"), "whee")
- self.assertEqual(h.setdefault("foo","whee"), "baz")
- self.assertEqual(h.setdefault("zoo","whee"), "whee")
- self.assertEqual(h["foo"],"baz")
- self.assertEqual(h["zoo"],"whee")
-
- def testRequireList(self):
- self.assertRaises(TypeError, Headers, "foo")
-
-
- def testExtras(self):
- h = Headers([])
- self.assertEqual(str(h),'\r\n')
-
- h.add_header('foo','bar',baz="spam")
- self.assertEqual(h['foo'], 'bar; baz="spam"')
- self.assertEqual(str(h),'foo: bar; baz="spam"\r\n\r\n')
-
- h.add_header('Foo','bar',cheese=None)
- self.assertEqual(h.get_all('foo'),
- ['bar; baz="spam"', 'bar; cheese'])
-
- self.assertEqual(str(h),
- 'foo: bar; baz="spam"\r\n'
- 'Foo: bar; cheese\r\n'
- '\r\n'
- )
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-TestClasses = (
- HeaderTests,
-)
-
-def test_suite():
- return TestSuite([makeSuite(t,'test') for t in TestClasses])
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/src/wsgiref/tests/test_util.py b/src/wsgiref/tests/test_util.py
deleted file mode 100644
index b0d121f..0000000
--- a/src/wsgiref/tests/test_util.py
+++ /dev/null
@@ -1,205 +0,0 @@
-from unittest import TestCase, TestSuite, makeSuite
-from wsgiref import util
-from wsgiref.tests import compare_generic_iter
-from StringIO import StringIO
-
-class UtilityTests(TestCase):
-
- def checkShift(self,sn_in,pi_in,part,sn_out,pi_out):
- env = {'SCRIPT_NAME':sn_in,'PATH_INFO':pi_in}
- util.setup_testing_defaults(env)
- self.assertEqual(util.shift_path_info(env),part)
- self.assertEqual(env['PATH_INFO'],pi_out)
- self.assertEqual(env['SCRIPT_NAME'],sn_out)
- return env
-
- def checkDefault(self, key, value, alt=None):
- # Check defaulting when empty
- env = {}
- util.setup_testing_defaults(env)
- if isinstance(value,StringIO):
- self.failUnless(isinstance(env[key],StringIO))
- else:
- self.assertEqual(env[key],value)
-
- # Check existing value
- env = {key:alt}
- util.setup_testing_defaults(env)
- self.failUnless(env[key] is alt)
-
- def checkCrossDefault(self,key,value,**kw):
- util.setup_testing_defaults(kw)
- self.assertEqual(kw[key],value)
-
- def checkAppURI(self,uri,**kw):
- util.setup_testing_defaults(kw)
- self.assertEqual(util.application_uri(kw),uri)
-
- def checkReqURI(self,uri,query=1,**kw):
- util.setup_testing_defaults(kw)
- self.assertEqual(util.request_uri(kw,query),uri)
-
- def checkFW(self,text,size,match):
-
- def make_it(text=text,size=size):
- return util.FileWrapper(StringIO(text),size)
-
- compare_generic_iter(make_it,match)
-
- it = make_it()
- self.failIf(it.filelike.closed)
-
- for item in it:
- pass
-
- self.failIf(it.filelike.closed)
-
- it.close()
- self.failUnless(it.filelike.closed)
-
-
- def testSimpleShifts(self):
- self.checkShift('','/', '', '/', '')
- self.checkShift('','/x', 'x', '/x', '')
- self.checkShift('/','', None, '/', '')
- self.checkShift('/a','/x/y', 'x', '/a/x', '/y')
- self.checkShift('/a','/x/', 'x', '/a/x', '/')
-
-
- def testNormalizedShifts(self):
- self.checkShift('/a/b', '/../y', '..', '/a', '/y')
- self.checkShift('', '/../y', '..', '', '/y')
- self.checkShift('/a/b', '//y', 'y', '/a/b/y', '')
- self.checkShift('/a/b', '//y/', 'y', '/a/b/y', '/')
- self.checkShift('/a/b', '/./y', 'y', '/a/b/y', '')
- self.checkShift('/a/b', '/./y/', 'y', '/a/b/y', '/')
- self.checkShift('/a/b', '///./..//y/.//', '..', '/a', '/y/')
- self.checkShift('/a/b', '///', '', '/a/b/', '')
- self.checkShift('/a/b', '/.//', '', '/a/b/', '')
- self.checkShift('/a/b', '/x//', 'x', '/a/b/x', '/')
- self.checkShift('/a/b', '/.', None, '/a/b', '')
-
-
- def testDefaults(self):
- for key, value in [
- ('SERVER_NAME','127.0.0.1'),
- ('SERVER_PORT', '80'),
- ('SERVER_PROTOCOL','HTTP/1.0'),
- ('HTTP_HOST','127.0.0.1'),
- ('REQUEST_METHOD','GET'),
- ('SCRIPT_NAME',''),
- ('PATH_INFO','/'),
- ('wsgi.version', (1,0)),
- ('wsgi.run_once', 0),
- ('wsgi.multithread', 0),
- ('wsgi.multiprocess', 0),
- ('wsgi.input', StringIO("")),
- ('wsgi.errors', StringIO()),
- ('wsgi.url_scheme','http'),
- ]:
- self.checkDefault(key,value)
-
-
- def testCrossDefaults(self):
- self.checkCrossDefault('HTTP_HOST',"foo.bar",SERVER_NAME="foo.bar")
- self.checkCrossDefault('wsgi.url_scheme',"https",HTTPS="on")
- self.checkCrossDefault('wsgi.url_scheme',"https",HTTPS="1")
- self.checkCrossDefault('wsgi.url_scheme',"https",HTTPS="yes")
- self.checkCrossDefault('wsgi.url_scheme',"http",HTTPS="foo")
- self.checkCrossDefault('SERVER_PORT',"80",HTTPS="foo")
- self.checkCrossDefault('SERVER_PORT',"443",HTTPS="on")
-
-
- def testGuessScheme(self):
- self.assertEqual(util.guess_scheme({}), "http")
- self.assertEqual(util.guess_scheme({'HTTPS':"foo"}), "http")
- self.assertEqual(util.guess_scheme({'HTTPS':"on"}), "https")
- self.assertEqual(util.guess_scheme({'HTTPS':"yes"}), "https")
- self.assertEqual(util.guess_scheme({'HTTPS':"1"}), "https")
-
-
-
-
-
- def testAppURIs(self):
- self.checkAppURI("http://127.0.0.1/")
- self.checkAppURI("http://127.0.0.1/spam", SCRIPT_NAME="/spam")
- self.checkAppURI("http://spam.example.com:2071/",
- HTTP_HOST="spam.example.com:2071", SERVER_PORT="2071")
- self.checkAppURI("http://spam.example.com/",
- SERVER_NAME="spam.example.com")
- self.checkAppURI("http://127.0.0.1/",
- HTTP_HOST="127.0.0.1", SERVER_NAME="spam.example.com")
- self.checkAppURI("https://127.0.0.1/", HTTPS="on")
- self.checkAppURI("http://127.0.0.1:8000/", SERVER_PORT="8000",
- HTTP_HOST=None)
-
- def testReqURIs(self):
- self.checkReqURI("http://127.0.0.1/")
- self.checkReqURI("http://127.0.0.1/spam", SCRIPT_NAME="/spam")
- self.checkReqURI("http://127.0.0.1/spammity/spam",
- SCRIPT_NAME="/spammity", PATH_INFO="/spam")
- self.checkReqURI("http://127.0.0.1/spammity/spam?say=ni",
- SCRIPT_NAME="/spammity", PATH_INFO="/spam",QUERY_STRING="say=ni")
- self.checkReqURI("http://127.0.0.1/spammity/spam", 0,
- SCRIPT_NAME="/spammity", PATH_INFO="/spam",QUERY_STRING="say=ni")
-
- def testFileWrapper(self):
- self.checkFW("xyz"*50, 120, ["xyz"*40,"xyz"*10])
-
- def testHopByHop(self):
- for hop in (
- "Connection Keep-Alive Proxy-Authenticate Proxy-Authorization "
- "TE Trailers Transfer-Encoding Upgrade"
- ).split():
- for alt in hop, hop.title(), hop.upper(), hop.lower():
- self.failUnless(util.is_hop_by_hop(alt))
-
- # Not comprehensive, just a few random header names
- for hop in (
- "Accept Cache-Control Date Pragma Trailer Via Warning"
- ).split():
- for alt in hop, hop.title(), hop.upper(), hop.lower():
- self.failIf(util.is_hop_by_hop(alt))
-
-TestClasses = (
- UtilityTests,
-)
-
-def test_suite():
- return TestSuite([makeSuite(t,'test') for t in TestClasses])
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/src/wsgiref/util.py b/src/wsgiref/util.py
deleted file mode 100644
index 0f805ec..0000000
--- a/src/wsgiref/util.py
+++ /dev/null
@@ -1,205 +0,0 @@
-"""Miscellaneous WSGI-related Utilities"""
-
-import posixpath
-
-__all__ = [
- 'FileWrapper', 'guess_scheme', 'application_uri', 'request_uri',
- 'shift_path_info', 'setup_testing_defaults',
-]
-
-
-class FileWrapper:
- """Wrapper to convert file-like objects to iterables"""
-
- def __init__(self, filelike, blksize=8192):
- self.filelike = filelike
- self.blksize = blksize
- if hasattr(filelike,'close'):
- self.close = filelike.close
-
- def __getitem__(self,key):
- data = self.filelike.read(self.blksize)
- if data:
- return data
- raise IndexError
-
- def __iter__(self):
- return self
-
- def next(self):
- data = self.filelike.read(self.blksize)
- if data:
- return data
- raise StopIteration
-
-
-
-
-
-
-
-
-def guess_scheme(environ):
- """Return a guess for whether 'wsgi.url_scheme' should be 'http' or 'https'
- """
- if environ.get("HTTPS") in ('yes','on','1'):
- return 'https'
- else:
- return 'http'
-
-def application_uri(environ):
- """Return the application's base URI (no PATH_INFO or QUERY_STRING)"""
- url = environ['wsgi.url_scheme']+'://'
- from urllib import quote
-
- if environ.get('HTTP_HOST'):
- url += environ['HTTP_HOST']
- else:
- url += environ['SERVER_NAME']
-
- if environ['wsgi.url_scheme'] == 'https':
- if environ['SERVER_PORT'] != '443':
- url += ':' + environ['SERVER_PORT']
- else:
- if environ['SERVER_PORT'] != '80':
- url += ':' + environ['SERVER_PORT']
-
- url += quote(environ.get('SCRIPT_NAME') or '/')
- return url
-
-def request_uri(environ,include_query=1):
- """Return the full request URI, optionally including the query string"""
- url = application_uri(environ)
- from urllib import quote
- path_info = quote(environ.get('PATH_INFO',''))
- if not environ.get('SCRIPT_NAME'):
- url += path_info[1:]
- else:
- url += path_info
- if include_query and environ.get('QUERY_STRING'):
- url += '?' + environ['QUERY_STRING']
- return url
-
-def shift_path_info(environ):
- """Shift a name from PATH_INFO to SCRIPT_NAME, returning it
-
- If there are no remaining path segments in PATH_INFO, return None.
- Note: 'environ' is modified in-place; use a copy if you need to keep
- the original PATH_INFO or SCRIPT_NAME.
-
- Note: when PATH_INFO is just a '/', this returns '' and appends a trailing
- '/' to SCRIPT_NAME, even though empty path segments are normally ignored,
- and SCRIPT_NAME doesn't normally end in a '/'. This is intentional
- behavior, to ensure that an application can tell the difference between
- '/x' and '/x/' when traversing to objects.
- """
- path_info = environ.get('PATH_INFO','')
- if not path_info:
- return None
-
- path_parts = path_info.split('/')
- path_parts[1:-1] = [p for p in path_parts[1:-1] if p and p<>'.']
- name = path_parts[1]
- del path_parts[1]
-
- script_name = environ.get('SCRIPT_NAME','')
- script_name = posixpath.normpath(script_name+'/'+name)
- if script_name.endswith('/'):
- script_name = script_name[:-1]
- if not name and not script_name.endswith('/'):
- script_name += '/'
-
- environ['SCRIPT_NAME'] = script_name
- environ['PATH_INFO'] = '/'.join(path_parts)
-
- # Special case: '/.' on PATH_INFO doesn't get stripped,
- # because we don't strip the last element of PATH_INFO
- # if there's only one path part left. Instead of fixing this
- # above, we fix it here so that PATH_INFO gets normalized to
- # an empty string in the environ.
- if name=='.':
- name = None
- return name
-
-def setup_testing_defaults(environ):
- """Update 'environ' with trivial defaults for testing purposes
-
- This adds various parameters required for WSGI, including HTTP_HOST,
- SERVER_NAME, SERVER_PORT, REQUEST_METHOD, SCRIPT_NAME, PATH_INFO,
- and all of the wsgi.* variables. It only supplies default values,
- and does not replace any existing settings for these variables.
-
- This routine is intended to make it easier for unit tests of WSGI
- servers and applications to set up dummy environments. It should *not*
- be used by actual WSGI servers or applications, since the data is fake!
- """
-
- environ.setdefault('SERVER_NAME','127.0.0.1')
- environ.setdefault('SERVER_PROTOCOL','HTTP/1.0')
-
- environ.setdefault('HTTP_HOST',environ['SERVER_NAME'])
- environ.setdefault('REQUEST_METHOD','GET')
-
- if 'SCRIPT_NAME' not in environ and 'PATH_INFO' not in environ:
- environ.setdefault('SCRIPT_NAME','')
- environ.setdefault('PATH_INFO','/')
-
- environ.setdefault('wsgi.version', (1,0))
- environ.setdefault('wsgi.run_once', 0)
- environ.setdefault('wsgi.multithread', 0)
- environ.setdefault('wsgi.multiprocess', 0)
-
- from StringIO import StringIO
- environ.setdefault('wsgi.input', StringIO(""))
- environ.setdefault('wsgi.errors', StringIO())
- environ.setdefault('wsgi.url_scheme',guess_scheme(environ))
-
- if environ['wsgi.url_scheme']=='http':
- environ.setdefault('SERVER_PORT', '80')
- elif environ['wsgi.url_scheme']=='https':
- environ.setdefault('SERVER_PORT', '443')
-
-
-
-
-_hoppish = {
- 'connection':1, 'keep-alive':1, 'proxy-authenticate':1,
- 'proxy-authorization':1, 'te':1, 'trailers':1, 'transfer-encoding':1,
- 'upgrade':1
-}.has_key
-
-def is_hop_by_hop(header_name):
- """Return true if 'header_name' is an HTTP/1.1 "Hop-by-Hop" header"""
- return _hoppish(header_name.lower())
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/src/wsgiref/validate.py b/src/wsgiref/validate.py
deleted file mode 100755
index af903df..0000000
--- a/src/wsgiref/validate.py
+++ /dev/null
@@ -1,429 +0,0 @@
-# (c) 2005 Ian Bicking and contributors; written for Paste (http://pythonpaste.org)
-# Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php
-# Also licenced under the Apache License, 2.0: http://opensource.org/licenses/apache2.0.php
-# Licensed to PSF under a Contributor Agreement
-"""
-Middleware to check for obedience to the WSGI specification.
-
-Some of the things this checks:
-
-* Signature of the application and start_response (including that
- keyword arguments are not used).
-
-* Environment checks:
-
- - Environment is a dictionary (and not a subclass).
-
- - That all the required keys are in the environment: REQUEST_METHOD,
- SERVER_NAME, SERVER_PORT, wsgi.version, wsgi.input, wsgi.errors,
- wsgi.multithread, wsgi.multiprocess, wsgi.run_once
-
- - That HTTP_CONTENT_TYPE and HTTP_CONTENT_LENGTH are not in the
- environment (these headers should appear as CONTENT_LENGTH and
- CONTENT_TYPE).
-
- - Warns if QUERY_STRING is missing, as the cgi module acts
- unpredictably in that case.
-
- - That CGI-style variables (that don't contain a .) have
- (non-unicode) string values
-
- - That wsgi.version is a tuple
-
- - That wsgi.url_scheme is 'http' or 'https' (@@: is this too
- restrictive?)
-
- - Warns if the REQUEST_METHOD is not known (@@: probably too
- restrictive).
-
- - That SCRIPT_NAME and PATH_INFO are empty or start with /
-
- - That at least one of SCRIPT_NAME or PATH_INFO are set.
-
- - That CONTENT_LENGTH is a positive integer.
-
- - That SCRIPT_NAME is not '/' (it should be '', and PATH_INFO should
- be '/').
-
- - That wsgi.input has the methods read, readline, readlines, and
- __iter__
-
- - That wsgi.errors has the methods flush, write, writelines
-
-* The status is a string, contains a space, starts with an integer,
- and that integer is in range (> 100).
-
-* That the headers is a list (not a subclass, not another kind of
- sequence).
-
-* That the items of the headers are tuples of strings.
-
-* That there is no 'status' header (that is used in CGI, but not in
- WSGI).
-
-* That the headers don't contain newlines or colons, end in _ or -, or
- contain characters codes below 037.
-
-* That Content-Type is given if there is content (CGI often has a
- default content type, but WSGI does not).
-
-* That no Content-Type is given when there is no content (@@: is this
- too restrictive?)
-
-* That the exc_info argument to start_response is a tuple or None.
-
-* That all calls to the writer are with strings, and no other methods
- on the writer are accessed.
-
-* That wsgi.input is used properly:
-
- - .read() is called with zero or one argument
-
- - That it returns a string
-
- - That readline, readlines, and __iter__ return strings
-
- - That .close() is not called
-
- - No other methods are provided
-
-* That wsgi.errors is used properly:
-
- - .write() and .writelines() is called with a string
-
- - That .close() is not called, and no other methods are provided.
-
-* The response iterator:
-
- - That it is not a string (it should be a list of a single string; a
- string will work, but perform horribly).
-
- - That .next() returns a string
-
- - That the iterator is not iterated over until start_response has
- been called (that can signal either a server or application
- error).
-
- - That .close() is called (doesn't raise exception, only prints to
- sys.stderr, because we only know it isn't called when the object
- is garbage collected).
-"""
-__all__ = ['middleware']
-
-
-import re
-import sys
-from types import DictType, StringType, TupleType, ListType
-import warnings
-
-header_re = re.compile(r'^[a-zA-Z][a-zA-Z0-9\-_]*$')
-bad_header_value_re = re.compile(r'[\000-\037]')
-
-class WSGIWarning(Warning):
- """
- Raised in response to WSGI-spec-related warnings
- """
-
-def middleware(application):
-
- """
- When applied between a WSGI server and a WSGI application, this
- middleware will check for WSGI compliancy on a number of levels.
- This middleware does not modify the request or response in any
- way, but will throw an AssertionError if anything seems off
- (except for a failure to close the application iterator, which
- will be printed to stderr -- there's no way to throw an exception
- at that point).
- """
-
- def lint_app(*args, **kw):
- assert len(args) == 2, "Two arguments required"
- assert not kw, "No keyword arguments allowed"
- environ, start_response = args
-
- check_environ(environ)
-
- # We use this to check if the application returns without
- # calling start_response:
- start_response_started = []
-
- def start_response_wrapper(*args, **kw):
- assert len(args) == 2 or len(args) == 3, (
- "Invalid number of arguments: %s" % args)
- assert not kw, "No keyword arguments allowed"
- status = args[0]
- headers = args[1]
- if len(args) == 3:
- exc_info = args[2]
- else:
- exc_info = None
-
- check_status(status)
- check_headers(headers)
- check_content_type(status, headers)
- check_exc_info(exc_info)
-
- start_response_started.append(None)
- return WriteWrapper(start_response(*args))
-
- environ['wsgi.input'] = InputWrapper(environ['wsgi.input'])
- environ['wsgi.errors'] = ErrorWrapper(environ['wsgi.errors'])
-
- iterator = application(environ, start_response_wrapper)
- assert iterator is not None and iterator != False, (
- "The application must return an iterator, if only an empty list")
-
- check_iterator(iterator)
-
- return IteratorWrapper(iterator, start_response_started)
-
- return lint_app
-
-class InputWrapper:
-
- def __init__(self, wsgi_input):
- self.input = wsgi_input
-
- def read(self, *args):
- assert len(args) <= 1
- v = self.input.read(*args)
- assert type(v) is type("")
- return v
-
- def readline(self):
- v = self.input.readline()
- assert type(v) is type("")
- return v
-
- def readlines(self, *args):
- assert len(args) <= 1
- lines = self.input.readlines(*args)
- assert type(lines) is type([])
- for line in lines:
- assert type(line) is type("")
- return lines
-
- def __iter__(self):
- while 1:
- line = self.readline()
- if not line:
- return
- yield line
-
- def close(self):
- assert 0, "input.close() must not be called"
-
-class ErrorWrapper:
-
- def __init__(self, wsgi_errors):
- self.errors = wsgi_errors
-
- def write(self, s):
- assert type(s) is type("")
- self.errors.write(s)
-
- def flush(self):
- self.errors.flush()
-
- def writelines(self, seq):
- for line in seq:
- self.write(line)
-
- def close(self):
- assert 0, "errors.close() must not be called"
-
-class WriteWrapper:
-
- def __init__(self, wsgi_writer):
- self.writer = wsgi_writer
-
- def __call__(self, s):
- assert type(s) is type("")
- self.writer(s)
-
-class PartialIteratorWrapper:
-
- def __init__(self, wsgi_iterator):
- self.iterator = wsgi_iterator
-
- def __iter__(self):
- # We want to make sure __iter__ is called
- return IteratorWrapper(self.iterator)
-
-class IteratorWrapper:
-
- def __init__(self, wsgi_iterator, check_start_response):
- self.original_iterator = wsgi_iterator
- self.iterator = iter(wsgi_iterator)
- self.closed = False
- self.check_start_response = check_start_response
-
- def __iter__(self):
- return self
-
- def next(self):
- assert not self.closed, (
- "Iterator read after closed")
- v = self.iterator.next()
- if self.check_start_response is not None:
- assert self.check_start_response, (
- "The application returns and we started iterating over its body, but start_response has not yet been called")
- self.check_start_response = None
- return v
-
- def close(self):
- self.closed = True
- if hasattr(self.original_iterator, 'close'):
- self.original_iterator.close()
-
- def __del__(self):
- if not self.closed:
- sys.stderr.write(
- "Iterator garbage collected without being closed")
- assert self.closed, (
- "Iterator garbage collected without being closed")
-
-def check_environ(environ):
- assert type(environ) is DictType, (
- "Environment is not of the right type: %r (environment: %r)"
- % (type(environ), environ))
-
- for key in ['REQUEST_METHOD', 'SERVER_NAME', 'SERVER_PORT',
- 'wsgi.version', 'wsgi.input', 'wsgi.errors',
- 'wsgi.multithread', 'wsgi.multiprocess',
- 'wsgi.run_once']:
- assert key in environ, (
- "Environment missing required key: %r" % key)
-
- for key in ['HTTP_CONTENT_TYPE', 'HTTP_CONTENT_LENGTH']:
- assert key not in environ, (
- "Environment should not have the key: %s "
- "(use %s instead)" % (key, key[5:]))
-
- if 'QUERY_STRING' not in environ:
- warnings.warn(
- 'QUERY_STRING is not in the WSGI environment; the cgi '
- 'module will use sys.argv when this variable is missing, '
- 'so application errors are more likely',
- WSGIWarning)
-
- for key in environ.keys():
- if '.' in key:
- # Extension, we don't care about its type
- continue
- assert type(environ[key]) is StringType, (
- "Environmental variable %s is not a string: %r (value: %r)"
- % (type(environ[key]), environ[key]))
-
- assert type(environ['wsgi.version']) is TupleType, (
- "wsgi.version should be a tuple (%r)" % environ['wsgi.version'])
- assert environ['wsgi.url_scheme'] in ('http', 'https'), (
- "wsgi.url_scheme unknown: %r" % environ['wsgi.url_scheme'])
-
- check_input(environ['wsgi.input'])
- check_errors(environ['wsgi.errors'])
-
- # @@: these need filling out:
- if environ['REQUEST_METHOD'] not in (
- 'GET', 'HEAD', 'POST', 'OPTIONS','PUT','DELETE','TRACE'):
- warnings.warn(
- "Unknown REQUEST_METHOD: %r" % environ['REQUEST_METHOD'],
- WSGIWarning)
-
- assert (not environ.get('SCRIPT_NAME')
- or environ['SCRIPT_NAME'].startswith('/')), (
- "SCRIPT_NAME doesn't start with /: %r" % environ['SCRIPT_NAME'])
- assert (not environ.get('PATH_INFO')
- or environ['PATH_INFO'].startswith('/')), (
- "PATH_INFO doesn't start with /: %r" % environ['PATH_INFO'])
- if environ.get('CONTENT_LENGTH'):
- assert int(environ['CONTENT_LENGTH']) >= 0, (
- "Invalid CONTENT_LENGTH: %r" % environ['CONTENT_LENGTH'])
-
- if not environ.get('SCRIPT_NAME'):
- assert environ.has_key('PATH_INFO'), (
- "One of SCRIPT_NAME or PATH_INFO are required (PATH_INFO "
- "should at least be '/' if SCRIPT_NAME is empty)")
- assert environ.get('SCRIPT_NAME') != '/', (
- "SCRIPT_NAME cannot be '/'; it should instead be '', and "
- "PATH_INFO should be '/'")
-
-def check_input(wsgi_input):
- for attr in ['read', 'readline', 'readlines', '__iter__']:
- assert hasattr(wsgi_input, attr), (
- "wsgi.input (%r) doesn't have the attribute %s"
- % (wsgi_input, attr))
-
-def check_errors(wsgi_errors):
- for attr in ['flush', 'write', 'writelines']:
- assert hasattr(wsgi_errors, attr), (
- "wsgi.errors (%r) doesn't have the attribute %s"
- % (wsgi_errors, attr))
-
-def check_status(status):
- assert type(status) is StringType, (
- "Status must be a string (not %r)" % status)
- # Implicitly check that we can turn it into an integer:
- status_code = status.split(None, 1)[0]
- assert len(status_code) == 3, (
- "Status codes must be three characters: %r" % status_code)
- status_int = int(status_code)
- assert status_int >= 100, "Status code is invalid: %r" % status_int
- if len(status) < 4 or status[3] != ' ':
- warnings.warn(
- "The status string (%r) should be a three-digit integer "
- "followed by a single space and a status explanation"
- % status, WSGIWarning)
-
-def check_headers(headers):
- assert type(headers) is ListType, (
- "Headers (%r) must be of type list: %r"
- % (headers, type(headers)))
- header_names = {}
- for item in headers:
- assert type(item) is TupleType, (
- "Individual headers (%r) must be of type tuple: %r"
- % (item, type(item)))
- assert len(item) == 2
- name, value = item
- assert name.lower() != 'status', (
- "The Status header cannot be used; it conflicts with CGI "
- "script, and HTTP status is not given through headers "
- "(value: %r)." % value)
- header_names[name.lower()] = None
- assert '\n' not in name and ':' not in name, (
- "Header names may not contain ':' or '\\n': %r" % name)
- assert header_re.search(name), "Bad header name: %r" % name
- assert not name.endswith('-') and not name.endswith('_'), (
- "Names may not end in '-' or '_': %r" % name)
- assert not bad_header_value_re.search(value), (
- "Bad header value: %r (bad char: %r)"
- % (value, bad_header_value_re.search(value).group(0)))
-
-def check_content_type(status, headers):
- code = int(status.split(None, 1)[0])
- # @@: need one more person to verify this interpretation of RFC 2616
- # http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html
- NO_MESSAGE_BODY = (204, 304)
- for name, value in headers:
- if name.lower() == 'content-type':
- if code not in NO_MESSAGE_BODY:
- return
- assert 0, (("Content-Type header found in a %s response, "
- "which must not return content.") % code)
- if code not in NO_MESSAGE_BODY:
- assert 0, "No Content-Type header found in headers (%s)" % headers
-
-def check_exc_info(exc_info):
- assert exc_info is None or type(exc_info) is type(()), (
- "exc_info (%r) is not a tuple: %r" % (exc_info, type(exc_info)))
- # More exc_info checks?
-
-def check_iterator(iterator):
- # Technically a string is legal, which is why it's a really bad
- # idea, because it may cause the response to be returned
- # character-by-character
- assert not isinstance(iterator, str), (
- "You should not return a string as your application iterator, "
- "instead return a single-item list containing that string.")
-