diff options
author | pje <pje@571e12c6-e1fa-0310-aee7-ff1267fa46bd> | 2006-06-05 17:48:00 +0000 |
---|---|---|
committer | pje <pje@571e12c6-e1fa-0310-aee7-ff1267fa46bd> | 2006-06-05 17:48:00 +0000 |
commit | c59b8b067ce2e70a4a11238ac0bd727ff68254c0 (patch) | |
tree | c8ae3c4144a125303ecb1bc26c2d16216cfd37be /src | |
parent | 1c9e934df59c3d10d91868a5bad8df37c687569e (diff) | |
download | wsgiref-c59b8b067ce2e70a4a11238ac0bd727ff68254c0.tar.gz |
Get rid of 'src' subdirectory
git-svn-id: svn://svn.eby-sarna.com/svnroot/wsgiref@2167 571e12c6-e1fa-0310-aee7-ff1267fa46bd
Diffstat (limited to 'src')
-rw-r--r-- | src/wsgiref/__init__.py | 23 | ||||
-rw-r--r-- | src/wsgiref/handlers.py | 492 | ||||
-rw-r--r-- | src/wsgiref/headers.py | 205 | ||||
-rw-r--r-- | src/wsgiref/simple_server.py | 205 | ||||
-rw-r--r-- | src/wsgiref/tests/__init__.py | 82 | ||||
-rw-r--r-- | src/wsgiref/tests/test_handlers.py | 246 | ||||
-rw-r--r-- | src/wsgiref/tests/test_headers.py | 123 | ||||
-rw-r--r-- | src/wsgiref/tests/test_util.py | 205 | ||||
-rw-r--r-- | src/wsgiref/util.py | 205 | ||||
-rwxr-xr-x | src/wsgiref/validate.py | 429 |
10 files changed, 0 insertions, 2215 deletions
diff --git a/src/wsgiref/__init__.py b/src/wsgiref/__init__.py deleted file mode 100644 index 46c579f..0000000 --- a/src/wsgiref/__init__.py +++ /dev/null @@ -1,23 +0,0 @@ -"""wsgiref -- a WSGI (PEP 333) Reference Library - -Current Contents: - -* util -- Miscellaneous useful functions and wrappers - -* headers -- Manage response headers - -* handlers -- base classes for server/gateway implementations - -* simple_server -- a simple BaseHTTPServer that supports WSGI - -* validate -- validation wrapper that sits between an app and a server - to detect errors in either - -To-Do: - -* cgi_gateway -- Run WSGI apps under CGI (pending a deployment standard) - -* cgi_wrapper -- Run CGI apps under WSGI - -* router -- a simple middleware component that handles URL traversal -""" diff --git a/src/wsgiref/handlers.py b/src/wsgiref/handlers.py deleted file mode 100644 index 572cf0d..0000000 --- a/src/wsgiref/handlers.py +++ /dev/null @@ -1,492 +0,0 @@ -"""Base classes for server/gateway implementations""" - -from types import StringType -from util import FileWrapper, guess_scheme, is_hop_by_hop -from headers import Headers - -import sys, os, time - -try: - dict -except NameError: - def dict(items): - d = {} - for k,v in items: - d[k] = v - return d - -try: - True - False -except NameError: - True = not None - False = not True - - -# Weekday and month names for HTTP date/time formatting; always English! -_weekdayname = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"] -_monthname = [None, # Dummy so we can use 1-based month numbers - "Jan", "Feb", "Mar", "Apr", "May", "Jun", - "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"] - -def format_date_time(timestamp): - year, month, day, hh, mm, ss, wd, y, z = time.gmtime(timestamp) - return "%s, %02d %3s %4d %02d:%02d:%02d GMT" % ( - _weekdayname[wd], day, _monthname[month], year, hh, mm, ss - ) - - - - - -class BaseHandler: - """Manage the invocation of a WSGI application""" - - # Configuration parameters; can override per-subclass or per-instance - wsgi_version = (1,0) - wsgi_multithread = True - wsgi_multiprocess = True - wsgi_run_once = False - - origin_server = True # We are transmitting direct to client - http_version = "1.0" # Version that should be used for response - server_software = None # String name of server software, if any - - # os_environ is used to supply configuration from the OS environment: - # by default it's a copy of 'os.environ' as of import time, but you can - # override this in e.g. your __init__ method. - os_environ = dict(os.environ.items()) - - # Collaborator classes - wsgi_file_wrapper = FileWrapper # set to None to disable - headers_class = Headers # must be a Headers-like class - - # Error handling (also per-subclass or per-instance) - traceback_limit = None # Print entire traceback to self.get_stderr() - error_status = "500 Dude, this is whack!" - error_headers = [('Content-Type','text/plain')] - error_body = "A server error occurred. Please contact the administrator." - - # State variables (don't mess with these) - status = result = None - headers_sent = False - headers = None - bytes_sent = 0 - - - - - - - - - def run(self, application): - """Invoke the application""" - # Note to self: don't move the close()! Asynchronous servers shouldn't - # call close() from finish_response(), so if you close() anywhere but - # the double-error branch here, you'll break asynchronous servers by - # prematurely closing. Async servers must return from 'run()' without - # closing if there might still be output to iterate over. - try: - self.setup_environ() - self.result = application(self.environ, self.start_response) - self.finish_response() - except: - try: - self.handle_error() - except: - # If we get an error handling an error, just give up already! - self.close() - raise # ...and let the actual server figure it out. - - - def setup_environ(self): - """Set up the environment for one request""" - - env = self.environ = self.os_environ.copy() - self.add_cgi_vars() - - env['wsgi.input'] = self.get_stdin() - env['wsgi.errors'] = self.get_stderr() - env['wsgi.version'] = self.wsgi_version - env['wsgi.run_once'] = self.wsgi_run_once - env['wsgi.url_scheme'] = self.get_scheme() - env['wsgi.multithread'] = self.wsgi_multithread - env['wsgi.multiprocess'] = self.wsgi_multiprocess - - if self.wsgi_file_wrapper is not None: - env['wsgi.file_wrapper'] = self.wsgi_file_wrapper - - if self.origin_server and self.server_software: - env.setdefault('SERVER_SOFTWARE',self.server_software) - - - def finish_response(self): - """Send any iterable data, then close self and the iterable - - Subclasses intended for use in asynchronous servers will - want to redefine this method, such that it sets up callbacks - in the event loop to iterate over the data, and to call - 'self.close()' once the response is finished. - """ - if not self.result_is_file() or not self.sendfile(): - for data in self.result: - self.write(data) - self.finish_content() - self.close() - - - def get_scheme(self): - """Return the URL scheme being used""" - return guess_scheme(self.environ) - - - def set_content_length(self): - """Compute Content-Length or switch to chunked encoding if possible""" - try: - blocks = len(self.result) - except (TypeError,AttributeError,NotImplementedError): - pass - else: - if blocks==1: - self.headers['Content-Length'] = str(self.bytes_sent) - return - # XXX Try for chunked encoding if origin server and client is 1.1 - - - def cleanup_headers(self): - """Make any necessary header changes or defaults - - Subclasses can extend this to add other defaults. - """ - if not self.headers.has_key('Content-Length'): - self.set_content_length() - - def start_response(self, status, headers,exc_info=None): - """'start_response()' callable as specified by PEP 333""" - - if exc_info: - try: - if self.headers_sent: - # Re-raise original exception if headers sent - raise exc_info[0], exc_info[1], exc_info[2] - finally: - exc_info = None # avoid dangling circular ref - elif self.headers is not None: - raise AssertionError("Headers already set!") - - assert type(status) is StringType,"Status must be a string" - assert len(status)>=4,"Status must be at least 4 characters" - assert int(status[:3]),"Status message must begin w/3-digit code" - assert status[3]==" ", "Status message must have a space after code" - if __debug__: - for name,val in headers: - assert type(name) is StringType,"Header names must be strings" - assert type(val) is StringType,"Header values must be strings" - assert not is_hop_by_hop(name),"Hop-by-hop headers not allowed" - self.status = status - self.headers = self.headers_class(headers) - return self.write - - - def send_preamble(self): - """Transmit version/status/date/server, via self._write()""" - if self.origin_server: - if self.client_is_modern(): - self._write('HTTP/%s %s\r\n' % (self.http_version,self.status)) - if not self.headers.has_key('Date'): - self._write( - 'Date: %s\r\n' % format_date_time(time.time()) - ) - if self.server_software and not self.headers.has_key('Server'): - self._write('Server: %s\r\n' % self.server_software) - else: - self._write('Status: %s\r\n' % self.status) - - def write(self, data): - """'write()' callable as specified by PEP 333""" - - assert type(data) is StringType,"write() argument must be string" - - if not self.status: - raise AssertionError("write() before start_response()") - - elif not self.headers_sent: - # Before the first output, send the stored headers - self.bytes_sent = len(data) # make sure we know content-length - self.send_headers() - else: - self.bytes_sent += len(data) - - # XXX check Content-Length and truncate if too many bytes written? - self._write(data) - self._flush() - - - def sendfile(self): - """Platform-specific file transmission - - Override this method in subclasses to support platform-specific - file transmission. It is only called if the application's - return iterable ('self.result') is an instance of - 'self.wsgi_file_wrapper'. - - This method should return a true value if it was able to actually - transmit the wrapped file-like object using a platform-specific - approach. It should return a false value if normal iteration - should be used instead. An exception can be raised to indicate - that transmission was attempted, but failed. - - NOTE: this method should call 'self.send_headers()' if - 'self.headers_sent' is false and it is going to attempt direct - transmission of the file1. - """ - return False # No platform-specific transmission by default - - - def finish_content(self): - """Ensure headers and content have both been sent""" - if not self.headers_sent: - self.headers['Content-Length'] = "0" - self.send_headers() - else: - pass # XXX check if content-length was too short? - - def close(self): - """Close the iterable (if needed) and reset all instance vars - - Subclasses may want to also drop the client connection. - """ - try: - if hasattr(self.result,'close'): - self.result.close() - finally: - self.result = self.headers = self.status = self.environ = None - self.bytes_sent = 0; self.headers_sent = False - - - def send_headers(self): - """Transmit headers to the client, via self._write()""" - self.cleanup_headers() - self.headers_sent = True - if not self.origin_server or self.client_is_modern(): - self.send_preamble() - self._write(str(self.headers)) - - - def result_is_file(self): - """True if 'self.result' is an instance of 'self.wsgi_file_wrapper'""" - wrapper = self.wsgi_file_wrapper - return wrapper is not None and isinstance(self.result,wrapper) - - - def client_is_modern(self): - """True if client can accept status and headers""" - return self.environ['SERVER_PROTOCOL'].upper() != 'HTTP/0.9' - - - def log_exception(self,exc_info): - """Log the 'exc_info' tuple in the server log - - Subclasses may override to retarget the output or change its format. - """ - try: - from traceback import print_exception - stderr = self.get_stderr() - print_exception( - exc_info[0], exc_info[1], exc_info[2], - self.traceback_limit, stderr - ) - stderr.flush() - finally: - exc_info = None - - def handle_error(self): - """Log current error, and send error output to client if possible""" - self.log_exception(sys.exc_info()) - if not self.headers_sent: - self.result = self.error_output(self.environ, self.start_response) - self.finish_response() - # XXX else: attempt advanced recovery techniques for HTML or text? - - def error_output(self, environ, start_response): - """WSGI mini-app to create error output - - By default, this just uses the 'error_status', 'error_headers', - and 'error_body' attributes to generate an output page. It can - be overridden in a subclass to dynamically generate diagnostics, - choose an appropriate message for the user's preferred language, etc. - - Note, however, that it's not recommended from a security perspective to - spit out diagnostics to any old user; ideally, you should have to do - something special to enable diagnostic output, which is why we don't - include any here! - """ - start_response(self.error_status,self.error_headers[:],sys.exc_info()) - return [self.error_body] - - - # Pure abstract methods; *must* be overridden in subclasses - - def _write(self,data): - """Override in subclass to buffer data for send to client - - It's okay if this method actually transmits the data; BaseHandler - just separates write and flush operations for greater efficiency - when the underlying system actually has such a distinction. - """ - raise NotImplementedError - - def _flush(self): - """Override in subclass to force sending of recent '_write()' calls - - It's okay if this method is a no-op (i.e., if '_write()' actually - sends the data. - """ - raise NotImplementedError - - def get_stdin(self): - """Override in subclass to return suitable 'wsgi.input'""" - raise NotImplementedError - - def get_stderr(self): - """Override in subclass to return suitable 'wsgi.errors'""" - raise NotImplementedError - - def add_cgi_vars(self): - """Override in subclass to insert CGI variables in 'self.environ'""" - raise NotImplementedError - - - - - - - - - - - -class SimpleHandler(BaseHandler): - """Handler that's just initialized with streams, environment, etc. - - This handler subclass is intended for synchronous HTTP/1.0 origin servers, - and handles sending the entire response output, given the correct inputs. - - Usage:: - - handler = BaseCGIHandler( - inp,out,err,env, multithread=False, multiprocess=True - ) - handler.run(app)""" - - def __init__(self,stdin,stdout,stderr,environ, - multithread=True, multiprocess=False - ): - self.stdin = stdin - self.stdout = stdout - self.stderr = stderr - self.base_env = environ - self.wsgi_multithread = multithread - self.wsgi_multiprocess = multiprocess - - def get_stdin(self): - return self.stdin - - def get_stderr(self): - return self.stderr - - def add_cgi_vars(self): - self.environ.update(self.base_env) - - def _write(self,data): - self.stdout.write(data) - self._write = self.stdout.write - - def _flush(self): - self.stdout.flush() - self._flush = self.stdout.flush - - -class BaseCGIHandler(SimpleHandler): - - """CGI-like systems using input/output/error streams and environ mapping - - Usage:: - - handler = BaseCGIHandler(inp,out,err,env) - handler.run(app) - - This handler class is useful for gateway protocols like ReadyExec and - FastCGI, that have usable input/output/error streams and an environment - mapping. It's also the base class for CGIHandler, which just uses - sys.stdin, os.environ, and so on. - - The constructor also takes keyword arguments 'multithread' and - 'multiprocess' (defaulting to 'True' and 'False' respectively) to control - the configuration sent to the application. It sets 'origin_server' to - False (to enable CGI-like output), and assumes that 'wsgi.run_once' is - False. - """ - - origin_server = False - - - - - - - - - - - - - - - - - - - -class CGIHandler(BaseCGIHandler): - - """CGI-based invocation via sys.stdin/stdout/stderr and os.environ - - Usage:: - - CGIHandler().run(app) - - The difference between this class and BaseCGIHandler is that it always - uses 'wsgi.run_once' of 'True', 'wsgi.multithread' of 'False', and - 'wsgi.multiprocess' of 'True'. It does not take any initialization - parameters, but always uses 'sys.stdin', 'os.environ', and friends. - - If you need to override any of these parameters, use BaseCGIHandler - instead. - """ - - wsgi_run_once = True - - def __init__(self): - BaseCGIHandler.__init__( - self, sys.stdin, sys.stdout, sys.stderr, dict(os.environ.items()), - multithread=False, multiprocess=True - ) - - - - - - - - - - - - - - - - - diff --git a/src/wsgiref/headers.py b/src/wsgiref/headers.py deleted file mode 100644 index fa9b829..0000000 --- a/src/wsgiref/headers.py +++ /dev/null @@ -1,205 +0,0 @@ -"""Manage HTTP Response Headers - -Much of this module is red-handedly pilfered from email.Message in the stdlib, -so portions are Copyright (C) 2001,2002 Python Software Foundation, and were -written by Barry Warsaw. -""" - -from types import ListType, TupleType - -# Regular expression that matches `special' characters in parameters, the -# existance of which force quoting of the parameter value. -import re -tspecials = re.compile(r'[ \(\)<>@,;:\\"/\[\]\?=]') - -def _formatparam(param, value=None, quote=1): - """Convenience function to format and return a key=value pair. - - This will quote the value if needed or if quote is true. - """ - if value is not None and len(value) > 0: - if quote or tspecials.search(value): - value = value.replace('\\', '\\\\').replace('"', r'\"') - return '%s="%s"' % (param, value) - else: - return '%s=%s' % (param, value) - else: - return param - - - - - - - - - - - - - - -class Headers: - - """Manage a collection of HTTP response headers""" - - def __init__(self,headers): - if type(headers) is not ListType: - raise TypeError("Headers must be a list of name/value tuples") - self._headers = headers - - def __len__(self): - """Return the total number of headers, including duplicates.""" - return len(self._headers) - - def __setitem__(self, name, val): - """Set the value of a header.""" - del self[name] - self._headers.append((name, val)) - - def __delitem__(self,name): - """Delete all occurrences of a header, if present. - - Does *not* raise an exception if the header is missing. - """ - name = name.lower() - self._headers[:] = [kv for kv in self._headers if kv[0].lower()<>name] - - def __getitem__(self,name): - """Get the first header value for 'name' - - Return None if the header is missing instead of raising an exception. - - Note that if the header appeared multiple times, the first exactly which - occurrance gets returned is undefined. Use getall() to get all - the values matching a header field name. - """ - return self.get(name) - - - - - - def has_key(self, name): - """Return true if the message contains the header.""" - return self.get(name) is not None - - __contains__ = has_key - - - def get_all(self, name): - """Return a list of all the values for the named field. - - These will be sorted in the order they appeared in the original header - list or were added to this instance, and may contain duplicates. Any - fields deleted and re-inserted are always appended to the header list. - If no fields exist with the given name, returns an empty list. - """ - name = name.lower() - return [kv[1] for kv in self._headers if kv[0].lower()==name] - - - def get(self,name,default=None): - """Get the first header value for 'name', or return 'default'""" - name = name.lower() - for k,v in self._headers: - if k.lower()==name: - return v - return default - - - def keys(self): - """Return a list of all the header field names. - - These will be sorted in the order they appeared in the original header - list, or were added to this instance, and may contain duplicates. - Any fields deleted and re-inserted are always appended to the header - list. - """ - return [k for k, v in self._headers] - - - - - def values(self): - """Return a list of all header values. - - These will be sorted in the order they appeared in the original header - list, or were added to this instance, and may contain duplicates. - Any fields deleted and re-inserted are always appended to the header - list. - """ - return [v for k, v in self._headers] - - def items(self): - """Get all the header fields and values. - - These will be sorted in the order they were in the original header - list, or were added to this instance, and may contain duplicates. - Any fields deleted and re-inserted are always appended to the header - list. - """ - return self._headers[:] - - def __repr__(self): - return "Headers(%s)" % `self._headers` - - def __str__(self): - """str() returns the formatted headers, complete with end line, - suitable for direct HTTP transmission.""" - return '\r\n'.join(["%s: %s" % kv for kv in self._headers]+['','']) - - def setdefault(self,name,value): - """Return first matching header value for 'name', or 'value' - - If there is no header named 'name', add a new header with name 'name' - and value 'value'.""" - result = self.get(name) - if result is None: - self._headers.append((name,value)) - return value - else: - return result - - - def add_header(self, _name, _value, **_params): - """Extended header setting. - - _name is the header field to add. keyword arguments can be used to set - additional parameters for the header field, with underscores converted - to dashes. Normally the parameter will be added as key="value" unless - value is None, in which case only the key will be added. - - Example: - - h.add_header('content-disposition', 'attachment', filename='bud.gif') - - Note that unlike the corresponding 'email.Message' method, this does - *not* handle '(charset, language, value)' tuples: all values must be - strings or None. - """ - parts = [] - if _value is not None: - parts.append(_value) - for k, v in _params.items(): - if v is None: - parts.append(k.replace('_', '-')) - else: - parts.append(_formatparam(k.replace('_', '-'), v)) - self._headers.append((_name, "; ".join(parts))) - - - - - - - - - - - - - - - - diff --git a/src/wsgiref/simple_server.py b/src/wsgiref/simple_server.py deleted file mode 100644 index d226359..0000000 --- a/src/wsgiref/simple_server.py +++ /dev/null @@ -1,205 +0,0 @@ -"""BaseHTTPServer that implements the Python WSGI protocol (PEP 333, rev 1.21) - -This is both an example of how WSGI can be implemented, and a basis for running -simple web applications on a local machine, such as might be done when testing -or debugging an application. It has not been reviewed for security issues, -however, and we strongly recommend that you use a "real" web server for -production use. - -For example usage, see the 'if __name__=="__main__"' block at the end of the -module. See also the BaseHTTPServer module docs for other API information. -""" - -from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer -import urllib, sys -from wsgiref.handlers import SimpleHandler - -__version__ = "0.1" -__all__ = ['WSGIServer','WSGIRequestHandler','demo_app'] - - -server_version = "WSGIServer/" + __version__ -sys_version = "Python/" + sys.version.split()[0] -software_version = server_version + ' ' + sys_version - - -class ServerHandler(SimpleHandler): - - server_software = software_version - - def close(self): - try: - self.request_handler.log_request( - self.status.split(' ',1)[0], self.bytes_sent - ) - finally: - SimpleHandler.close(self) - - - - - -class WSGIServer(HTTPServer): - - """BaseHTTPServer that implements the Python WSGI protocol""" - - application = None - - def server_bind(self): - """Override server_bind to store the server name.""" - HTTPServer.server_bind(self) - self.setup_environ() - - def setup_environ(self): - # Set up base environment - env = self.base_environ = {} - env['SERVER_NAME'] = self.server_name - env['GATEWAY_INTERFACE'] = 'CGI/1.1' - env['SERVER_PORT'] = str(self.server_port) - env['REMOTE_HOST']='' - env['CONTENT_LENGTH']='' - env['SCRIPT_NAME'] = '' - - def get_app(self): - return self.application - - def set_app(self,application): - self.application = application - - - - - - - - - - - - - - - -class WSGIRequestHandler(BaseHTTPRequestHandler): - - server_version = "WSGIServer/" + __version__ - - def get_environ(self): - env = self.server.base_environ.copy() - env['SERVER_PROTOCOL'] = self.request_version - env['REQUEST_METHOD'] = self.command - if '?' in self.path: - path,query = self.path.split('?',1) - else: - path,query = self.path,'' - - env['PATH_INFO'] = urllib.unquote(path) - env['QUERY_STRING'] = query - - host = self.address_string() - if host != self.client_address[0]: - env['REMOTE_HOST'] = host - env['REMOTE_ADDR'] = self.client_address[0] - - if self.headers.typeheader is None: - env['CONTENT_TYPE'] = self.headers.type - else: - env['CONTENT_TYPE'] = self.headers.typeheader - - length = self.headers.getheader('content-length') - if length: - env['CONTENT_LENGTH'] = length - - for h in self.headers.headers: - k,v = h.split(':',1) - k=k.replace('-','_').upper(); v=v.strip() - if k in env: - continue # skip content length, type,etc. - if 'HTTP_'+k in env: - env['HTTP_'+k] += ','+v # comma-separate multiple headers - else: - env['HTTP_'+k] = v - return env - - def get_stderr(self): - return sys.stderr - - def handle(self): - """Handle a single HTTP request""" - - self.raw_requestline = self.rfile.readline() - if not self.parse_request(): # An error code has been sent, just exit - return - - handler = ServerHandler( - self.rfile, self.wfile, self.get_stderr(), self.get_environ() - ) - handler.request_handler = self # backpointer for logging - handler.run(self.server.get_app()) - - - - - - - - - - - - - - - - - - - - - - - - - - -def demo_app(environ,start_response): - from StringIO import StringIO - stdout = StringIO() - print >>stdout, "Hello world!" - print >>stdout - h = environ.items(); h.sort() - for k,v in h: - print >>stdout, k,'=',`v` - start_response("200 OK", [('Content-Type','text/plain')]) - return [stdout.getvalue()] - - -if __name__ == '__main__': - server_address = ('', 8000) - httpd = WSGIServer(server_address, WSGIRequestHandler) - httpd.set_app(demo_app) - sa = httpd.socket.getsockname() - print "Serving HTTP on", sa[0], "port", sa[1], "..." - import webbrowser - webbrowser.open('http://localhost:8000/xyz?abc') - httpd.handle_request() # serve one request, then exit - - - - - - - - - - - - - - - - - - - - diff --git a/src/wsgiref/tests/__init__.py b/src/wsgiref/tests/__init__.py deleted file mode 100644 index 6b482a0..0000000 --- a/src/wsgiref/tests/__init__.py +++ /dev/null @@ -1,82 +0,0 @@ -from unittest import TestSuite, TestCase, makeSuite - -def compare_generic_iter(make_it,match): - """Utility to compare a generic 2.1/2.2+ iterator with an iterable - - If running under Python 2.2+, this tests the iterator using iter()/next(), - as well as __getitem__. 'make_it' must be a function returning a fresh - iterator to be tested (since this may test the iterator twice).""" - - it = make_it() - n = 0 - for item in match: - assert it[n]==item - n+=1 - try: - it[n] - except IndexError: - pass - else: - raise AssertionError("Too many items from __getitem__",it) - - try: - iter, StopIteration - except NameError: - pass - else: - # Only test iter mode under 2.2+ - it = make_it() - assert iter(it) is it - for item in match: - assert it.next()==item - try: - it.next() - except StopIteration: - pass - else: - raise AssertionError("Too many items from .next()",it) - - - - -def test_suite(): - - from wsgiref.tests import test_util - from wsgiref.tests import test_headers - from wsgiref.tests import test_handlers - - tests = [ - test_util.test_suite(), - test_headers.test_suite(), - test_handlers.test_suite(), - ] - - return TestSuite(tests) - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/src/wsgiref/tests/test_handlers.py b/src/wsgiref/tests/test_handlers.py deleted file mode 100644 index 80cbb93..0000000 --- a/src/wsgiref/tests/test_handlers.py +++ /dev/null @@ -1,246 +0,0 @@ -from __future__ import nested_scopes # Backward compat for 2.1 -from unittest import TestCase, TestSuite, makeSuite -from wsgiref.util import setup_testing_defaults -from wsgiref.headers import Headers -from wsgiref.handlers import BaseHandler, BaseCGIHandler -from StringIO import StringIO -import re - -class ErrorHandler(BaseCGIHandler): - """Simple handler subclass for testing BaseHandler""" - - def __init__(self,**kw): - setup_testing_defaults(kw) - BaseCGIHandler.__init__( - self, StringIO(''), StringIO(), StringIO(), kw, - multithread=True, multiprocess=True - ) - -class TestHandler(ErrorHandler): - """Simple handler subclass for testing BaseHandler, w/error passthru""" - - def handle_error(self): - raise # for testing, we want to see what's happening - - - - - - - - - - - - - - - - - - -class HandlerTests(TestCase): - - def checkEnvironAttrs(self, handler): - env = handler.environ - for attr in [ - 'version','multithread','multiprocess','run_once','file_wrapper' - ]: - if attr=='file_wrapper' and handler.wsgi_file_wrapper is None: - continue - self.assertEqual(getattr(handler,'wsgi_'+attr),env['wsgi.'+attr]) - - def checkOSEnviron(self,handler): - empty = {}; setup_testing_defaults(empty) - env = handler.environ - from os import environ - for k,v in environ.items(): - if not empty.has_key(k): - self.assertEqual(env[k],v) - for k,v in empty.items(): - self.failUnless(env.has_key(k)) - - def testEnviron(self): - h = TestHandler(X="Y") - h.setup_environ() - self.checkEnvironAttrs(h) - self.checkOSEnviron(h) - self.assertEqual(h.environ["X"],"Y") - - def testCGIEnviron(self): - h = BaseCGIHandler(None,None,None,{}) - h.setup_environ() - for key in 'wsgi.url_scheme', 'wsgi.input', 'wsgi.errors': - assert h.environ.has_key(key) - - def testScheme(self): - h=TestHandler(HTTPS="on"); h.setup_environ() - self.assertEqual(h.environ['wsgi.url_scheme'],'https') - h=TestHandler(); h.setup_environ() - self.assertEqual(h.environ['wsgi.url_scheme'],'http') - - - def testAbstractMethods(self): - h = BaseHandler() - for name in [ - '_flush','get_stdin','get_stderr','add_cgi_vars' - ]: - self.assertRaises(NotImplementedError, getattr(h,name)) - self.assertRaises(NotImplementedError, h._write, "test") - - - def testContentLength(self): - # Demo one reason iteration is better than write()... ;) - - def trivial_app1(e,s): - s('200 OK',[]) - return [e['wsgi.url_scheme']] - - def trivial_app2(e,s): - s('200 OK',[])(e['wsgi.url_scheme']) - return [] - - h = TestHandler() - h.run(trivial_app1) - self.assertEqual(h.stdout.getvalue(), - "Status: 200 OK\r\n" - "Content-Length: 4\r\n" - "\r\n" - "http") - - h = TestHandler() - h.run(trivial_app2) - self.assertEqual(h.stdout.getvalue(), - "Status: 200 OK\r\n" - "\r\n" - "http") - - - - - - - - def testBasicErrorOutput(self): - - def non_error_app(e,s): - s('200 OK',[]) - return [] - - def error_app(e,s): - raise AssertionError("This should be caught by handler") - - h = ErrorHandler() - h.run(non_error_app) - self.assertEqual(h.stdout.getvalue(), - "Status: 200 OK\r\n" - "Content-Length: 0\r\n" - "\r\n") - self.assertEqual(h.stderr.getvalue(),"") - - h = ErrorHandler() - h.run(error_app) - self.assertEqual(h.stdout.getvalue(), - "Status: %s\r\n" - "Content-Type: text/plain\r\n" - "Content-Length: %d\r\n" - "\r\n%s" % (h.error_status,len(h.error_body),h.error_body)) - - self.failUnless(h.stderr.getvalue().find("AssertionError")<>-1) - - def testErrorAfterOutput(self): - MSG = "Some output has been sent" - def error_app(e,s): - s("200 OK",[])(MSG) - raise AssertionError("This should be caught by handler") - - h = ErrorHandler() - h.run(error_app) - self.assertEqual(h.stdout.getvalue(), - "Status: 200 OK\r\n" - "\r\n"+MSG) - self.failUnless(h.stderr.getvalue().find("AssertionError")<>-1) - - - def testHeaderFormats(self): - - def non_error_app(e,s): - s('200 OK',[]) - return [] - - stdpat = ( - r"HTTP/%s 200 OK\r\n" - r"Date: \w{3}, [ 0123]\d \w{3} \d{4} \d\d:\d\d:\d\d GMT\r\n" - r"%s" r"Content-Length: 0\r\n" r"\r\n" - ) - shortpat = ( - "Status: 200 OK\r\n" "Content-Length: 0\r\n" "\r\n" - ) - - for ssw in "FooBar/1.0", None: - sw = ssw and "Server: %s\r\n" % ssw or "" - - for version in "1.0", "1.1": - for proto in "HTTP/0.9", "HTTP/1.0", "HTTP/1.1": - - h = TestHandler(SERVER_PROTOCOL=proto) - h.origin_server = False - h.http_version = version - h.server_software = ssw - h.run(non_error_app) - self.assertEqual(shortpat,h.stdout.getvalue()) - - h = TestHandler(SERVER_PROTOCOL=proto) - h.origin_server = True - h.http_version = version - h.server_software = ssw - h.run(non_error_app) - if proto=="HTTP/0.9": - self.assertEqual(h.stdout.getvalue(),"") - else: - self.failUnless( - re.match(stdpat%(version,sw), h.stdout.getvalue()), - (stdpat%(version,sw), h.stdout.getvalue()) - ) - -TestClasses = ( - HandlerTests, -) - -def test_suite(): - return TestSuite([makeSuite(t,'test') for t in TestClasses]) - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/src/wsgiref/tests/test_headers.py b/src/wsgiref/tests/test_headers.py deleted file mode 100644 index db9afc3..0000000 --- a/src/wsgiref/tests/test_headers.py +++ /dev/null @@ -1,123 +0,0 @@ -from unittest import TestCase, TestSuite, makeSuite -from wsgiref.headers import Headers -from wsgiref.tests import compare_generic_iter - -class HeaderTests(TestCase): - - def testMappingInterface(self): - test = [('x','y')] - self.assertEqual(len(Headers([])),0) - self.assertEqual(len(Headers(test[:])),1) - self.assertEqual(Headers(test[:]).keys(), ['x']) - self.assertEqual(Headers(test[:]).values(), ['y']) - self.assertEqual(Headers(test[:]).items(), test) - self.failIf(Headers(test).items() is test) # must be copy! - - h=Headers([]) - del h['foo'] # should not raise an error - - h['Foo'] = 'bar' - for m in h.has_key, h.__contains__, h.get, h.get_all, h.__getitem__: - self.failUnless(m('foo')) - self.failUnless(m('Foo')) - self.failUnless(m('FOO')) - self.failIf(m('bar')) - - self.assertEqual(h['foo'],'bar') - h['foo'] = 'baz' - self.assertEqual(h['FOO'],'baz') - self.assertEqual(h.get_all('foo'),['baz']) - - self.assertEqual(h.get("foo","whee"), "baz") - self.assertEqual(h.get("zoo","whee"), "whee") - self.assertEqual(h.setdefault("foo","whee"), "baz") - self.assertEqual(h.setdefault("zoo","whee"), "whee") - self.assertEqual(h["foo"],"baz") - self.assertEqual(h["zoo"],"whee") - - def testRequireList(self): - self.assertRaises(TypeError, Headers, "foo") - - - def testExtras(self): - h = Headers([]) - self.assertEqual(str(h),'\r\n') - - h.add_header('foo','bar',baz="spam") - self.assertEqual(h['foo'], 'bar; baz="spam"') - self.assertEqual(str(h),'foo: bar; baz="spam"\r\n\r\n') - - h.add_header('Foo','bar',cheese=None) - self.assertEqual(h.get_all('foo'), - ['bar; baz="spam"', 'bar; cheese']) - - self.assertEqual(str(h), - 'foo: bar; baz="spam"\r\n' - 'Foo: bar; cheese\r\n' - '\r\n' - ) - - - - - - - - - - - - - - - - - - - - - - - - -TestClasses = ( - HeaderTests, -) - -def test_suite(): - return TestSuite([makeSuite(t,'test') for t in TestClasses]) - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/src/wsgiref/tests/test_util.py b/src/wsgiref/tests/test_util.py deleted file mode 100644 index b0d121f..0000000 --- a/src/wsgiref/tests/test_util.py +++ /dev/null @@ -1,205 +0,0 @@ -from unittest import TestCase, TestSuite, makeSuite -from wsgiref import util -from wsgiref.tests import compare_generic_iter -from StringIO import StringIO - -class UtilityTests(TestCase): - - def checkShift(self,sn_in,pi_in,part,sn_out,pi_out): - env = {'SCRIPT_NAME':sn_in,'PATH_INFO':pi_in} - util.setup_testing_defaults(env) - self.assertEqual(util.shift_path_info(env),part) - self.assertEqual(env['PATH_INFO'],pi_out) - self.assertEqual(env['SCRIPT_NAME'],sn_out) - return env - - def checkDefault(self, key, value, alt=None): - # Check defaulting when empty - env = {} - util.setup_testing_defaults(env) - if isinstance(value,StringIO): - self.failUnless(isinstance(env[key],StringIO)) - else: - self.assertEqual(env[key],value) - - # Check existing value - env = {key:alt} - util.setup_testing_defaults(env) - self.failUnless(env[key] is alt) - - def checkCrossDefault(self,key,value,**kw): - util.setup_testing_defaults(kw) - self.assertEqual(kw[key],value) - - def checkAppURI(self,uri,**kw): - util.setup_testing_defaults(kw) - self.assertEqual(util.application_uri(kw),uri) - - def checkReqURI(self,uri,query=1,**kw): - util.setup_testing_defaults(kw) - self.assertEqual(util.request_uri(kw,query),uri) - - def checkFW(self,text,size,match): - - def make_it(text=text,size=size): - return util.FileWrapper(StringIO(text),size) - - compare_generic_iter(make_it,match) - - it = make_it() - self.failIf(it.filelike.closed) - - for item in it: - pass - - self.failIf(it.filelike.closed) - - it.close() - self.failUnless(it.filelike.closed) - - - def testSimpleShifts(self): - self.checkShift('','/', '', '/', '') - self.checkShift('','/x', 'x', '/x', '') - self.checkShift('/','', None, '/', '') - self.checkShift('/a','/x/y', 'x', '/a/x', '/y') - self.checkShift('/a','/x/', 'x', '/a/x', '/') - - - def testNormalizedShifts(self): - self.checkShift('/a/b', '/../y', '..', '/a', '/y') - self.checkShift('', '/../y', '..', '', '/y') - self.checkShift('/a/b', '//y', 'y', '/a/b/y', '') - self.checkShift('/a/b', '//y/', 'y', '/a/b/y', '/') - self.checkShift('/a/b', '/./y', 'y', '/a/b/y', '') - self.checkShift('/a/b', '/./y/', 'y', '/a/b/y', '/') - self.checkShift('/a/b', '///./..//y/.//', '..', '/a', '/y/') - self.checkShift('/a/b', '///', '', '/a/b/', '') - self.checkShift('/a/b', '/.//', '', '/a/b/', '') - self.checkShift('/a/b', '/x//', 'x', '/a/b/x', '/') - self.checkShift('/a/b', '/.', None, '/a/b', '') - - - def testDefaults(self): - for key, value in [ - ('SERVER_NAME','127.0.0.1'), - ('SERVER_PORT', '80'), - ('SERVER_PROTOCOL','HTTP/1.0'), - ('HTTP_HOST','127.0.0.1'), - ('REQUEST_METHOD','GET'), - ('SCRIPT_NAME',''), - ('PATH_INFO','/'), - ('wsgi.version', (1,0)), - ('wsgi.run_once', 0), - ('wsgi.multithread', 0), - ('wsgi.multiprocess', 0), - ('wsgi.input', StringIO("")), - ('wsgi.errors', StringIO()), - ('wsgi.url_scheme','http'), - ]: - self.checkDefault(key,value) - - - def testCrossDefaults(self): - self.checkCrossDefault('HTTP_HOST',"foo.bar",SERVER_NAME="foo.bar") - self.checkCrossDefault('wsgi.url_scheme',"https",HTTPS="on") - self.checkCrossDefault('wsgi.url_scheme',"https",HTTPS="1") - self.checkCrossDefault('wsgi.url_scheme',"https",HTTPS="yes") - self.checkCrossDefault('wsgi.url_scheme',"http",HTTPS="foo") - self.checkCrossDefault('SERVER_PORT',"80",HTTPS="foo") - self.checkCrossDefault('SERVER_PORT',"443",HTTPS="on") - - - def testGuessScheme(self): - self.assertEqual(util.guess_scheme({}), "http") - self.assertEqual(util.guess_scheme({'HTTPS':"foo"}), "http") - self.assertEqual(util.guess_scheme({'HTTPS':"on"}), "https") - self.assertEqual(util.guess_scheme({'HTTPS':"yes"}), "https") - self.assertEqual(util.guess_scheme({'HTTPS':"1"}), "https") - - - - - - def testAppURIs(self): - self.checkAppURI("http://127.0.0.1/") - self.checkAppURI("http://127.0.0.1/spam", SCRIPT_NAME="/spam") - self.checkAppURI("http://spam.example.com:2071/", - HTTP_HOST="spam.example.com:2071", SERVER_PORT="2071") - self.checkAppURI("http://spam.example.com/", - SERVER_NAME="spam.example.com") - self.checkAppURI("http://127.0.0.1/", - HTTP_HOST="127.0.0.1", SERVER_NAME="spam.example.com") - self.checkAppURI("https://127.0.0.1/", HTTPS="on") - self.checkAppURI("http://127.0.0.1:8000/", SERVER_PORT="8000", - HTTP_HOST=None) - - def testReqURIs(self): - self.checkReqURI("http://127.0.0.1/") - self.checkReqURI("http://127.0.0.1/spam", SCRIPT_NAME="/spam") - self.checkReqURI("http://127.0.0.1/spammity/spam", - SCRIPT_NAME="/spammity", PATH_INFO="/spam") - self.checkReqURI("http://127.0.0.1/spammity/spam?say=ni", - SCRIPT_NAME="/spammity", PATH_INFO="/spam",QUERY_STRING="say=ni") - self.checkReqURI("http://127.0.0.1/spammity/spam", 0, - SCRIPT_NAME="/spammity", PATH_INFO="/spam",QUERY_STRING="say=ni") - - def testFileWrapper(self): - self.checkFW("xyz"*50, 120, ["xyz"*40,"xyz"*10]) - - def testHopByHop(self): - for hop in ( - "Connection Keep-Alive Proxy-Authenticate Proxy-Authorization " - "TE Trailers Transfer-Encoding Upgrade" - ).split(): - for alt in hop, hop.title(), hop.upper(), hop.lower(): - self.failUnless(util.is_hop_by_hop(alt)) - - # Not comprehensive, just a few random header names - for hop in ( - "Accept Cache-Control Date Pragma Trailer Via Warning" - ).split(): - for alt in hop, hop.title(), hop.upper(), hop.lower(): - self.failIf(util.is_hop_by_hop(alt)) - -TestClasses = ( - UtilityTests, -) - -def test_suite(): - return TestSuite([makeSuite(t,'test') for t in TestClasses]) - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/src/wsgiref/util.py b/src/wsgiref/util.py deleted file mode 100644 index 0f805ec..0000000 --- a/src/wsgiref/util.py +++ /dev/null @@ -1,205 +0,0 @@ -"""Miscellaneous WSGI-related Utilities""" - -import posixpath - -__all__ = [ - 'FileWrapper', 'guess_scheme', 'application_uri', 'request_uri', - 'shift_path_info', 'setup_testing_defaults', -] - - -class FileWrapper: - """Wrapper to convert file-like objects to iterables""" - - def __init__(self, filelike, blksize=8192): - self.filelike = filelike - self.blksize = blksize - if hasattr(filelike,'close'): - self.close = filelike.close - - def __getitem__(self,key): - data = self.filelike.read(self.blksize) - if data: - return data - raise IndexError - - def __iter__(self): - return self - - def next(self): - data = self.filelike.read(self.blksize) - if data: - return data - raise StopIteration - - - - - - - - -def guess_scheme(environ): - """Return a guess for whether 'wsgi.url_scheme' should be 'http' or 'https' - """ - if environ.get("HTTPS") in ('yes','on','1'): - return 'https' - else: - return 'http' - -def application_uri(environ): - """Return the application's base URI (no PATH_INFO or QUERY_STRING)""" - url = environ['wsgi.url_scheme']+'://' - from urllib import quote - - if environ.get('HTTP_HOST'): - url += environ['HTTP_HOST'] - else: - url += environ['SERVER_NAME'] - - if environ['wsgi.url_scheme'] == 'https': - if environ['SERVER_PORT'] != '443': - url += ':' + environ['SERVER_PORT'] - else: - if environ['SERVER_PORT'] != '80': - url += ':' + environ['SERVER_PORT'] - - url += quote(environ.get('SCRIPT_NAME') or '/') - return url - -def request_uri(environ,include_query=1): - """Return the full request URI, optionally including the query string""" - url = application_uri(environ) - from urllib import quote - path_info = quote(environ.get('PATH_INFO','')) - if not environ.get('SCRIPT_NAME'): - url += path_info[1:] - else: - url += path_info - if include_query and environ.get('QUERY_STRING'): - url += '?' + environ['QUERY_STRING'] - return url - -def shift_path_info(environ): - """Shift a name from PATH_INFO to SCRIPT_NAME, returning it - - If there are no remaining path segments in PATH_INFO, return None. - Note: 'environ' is modified in-place; use a copy if you need to keep - the original PATH_INFO or SCRIPT_NAME. - - Note: when PATH_INFO is just a '/', this returns '' and appends a trailing - '/' to SCRIPT_NAME, even though empty path segments are normally ignored, - and SCRIPT_NAME doesn't normally end in a '/'. This is intentional - behavior, to ensure that an application can tell the difference between - '/x' and '/x/' when traversing to objects. - """ - path_info = environ.get('PATH_INFO','') - if not path_info: - return None - - path_parts = path_info.split('/') - path_parts[1:-1] = [p for p in path_parts[1:-1] if p and p<>'.'] - name = path_parts[1] - del path_parts[1] - - script_name = environ.get('SCRIPT_NAME','') - script_name = posixpath.normpath(script_name+'/'+name) - if script_name.endswith('/'): - script_name = script_name[:-1] - if not name and not script_name.endswith('/'): - script_name += '/' - - environ['SCRIPT_NAME'] = script_name - environ['PATH_INFO'] = '/'.join(path_parts) - - # Special case: '/.' on PATH_INFO doesn't get stripped, - # because we don't strip the last element of PATH_INFO - # if there's only one path part left. Instead of fixing this - # above, we fix it here so that PATH_INFO gets normalized to - # an empty string in the environ. - if name=='.': - name = None - return name - -def setup_testing_defaults(environ): - """Update 'environ' with trivial defaults for testing purposes - - This adds various parameters required for WSGI, including HTTP_HOST, - SERVER_NAME, SERVER_PORT, REQUEST_METHOD, SCRIPT_NAME, PATH_INFO, - and all of the wsgi.* variables. It only supplies default values, - and does not replace any existing settings for these variables. - - This routine is intended to make it easier for unit tests of WSGI - servers and applications to set up dummy environments. It should *not* - be used by actual WSGI servers or applications, since the data is fake! - """ - - environ.setdefault('SERVER_NAME','127.0.0.1') - environ.setdefault('SERVER_PROTOCOL','HTTP/1.0') - - environ.setdefault('HTTP_HOST',environ['SERVER_NAME']) - environ.setdefault('REQUEST_METHOD','GET') - - if 'SCRIPT_NAME' not in environ and 'PATH_INFO' not in environ: - environ.setdefault('SCRIPT_NAME','') - environ.setdefault('PATH_INFO','/') - - environ.setdefault('wsgi.version', (1,0)) - environ.setdefault('wsgi.run_once', 0) - environ.setdefault('wsgi.multithread', 0) - environ.setdefault('wsgi.multiprocess', 0) - - from StringIO import StringIO - environ.setdefault('wsgi.input', StringIO("")) - environ.setdefault('wsgi.errors', StringIO()) - environ.setdefault('wsgi.url_scheme',guess_scheme(environ)) - - if environ['wsgi.url_scheme']=='http': - environ.setdefault('SERVER_PORT', '80') - elif environ['wsgi.url_scheme']=='https': - environ.setdefault('SERVER_PORT', '443') - - - - -_hoppish = { - 'connection':1, 'keep-alive':1, 'proxy-authenticate':1, - 'proxy-authorization':1, 'te':1, 'trailers':1, 'transfer-encoding':1, - 'upgrade':1 -}.has_key - -def is_hop_by_hop(header_name): - """Return true if 'header_name' is an HTTP/1.1 "Hop-by-Hop" header""" - return _hoppish(header_name.lower()) - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/src/wsgiref/validate.py b/src/wsgiref/validate.py deleted file mode 100755 index af903df..0000000 --- a/src/wsgiref/validate.py +++ /dev/null @@ -1,429 +0,0 @@ -# (c) 2005 Ian Bicking and contributors; written for Paste (http://pythonpaste.org) -# Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php -# Also licenced under the Apache License, 2.0: http://opensource.org/licenses/apache2.0.php -# Licensed to PSF under a Contributor Agreement -""" -Middleware to check for obedience to the WSGI specification. - -Some of the things this checks: - -* Signature of the application and start_response (including that - keyword arguments are not used). - -* Environment checks: - - - Environment is a dictionary (and not a subclass). - - - That all the required keys are in the environment: REQUEST_METHOD, - SERVER_NAME, SERVER_PORT, wsgi.version, wsgi.input, wsgi.errors, - wsgi.multithread, wsgi.multiprocess, wsgi.run_once - - - That HTTP_CONTENT_TYPE and HTTP_CONTENT_LENGTH are not in the - environment (these headers should appear as CONTENT_LENGTH and - CONTENT_TYPE). - - - Warns if QUERY_STRING is missing, as the cgi module acts - unpredictably in that case. - - - That CGI-style variables (that don't contain a .) have - (non-unicode) string values - - - That wsgi.version is a tuple - - - That wsgi.url_scheme is 'http' or 'https' (@@: is this too - restrictive?) - - - Warns if the REQUEST_METHOD is not known (@@: probably too - restrictive). - - - That SCRIPT_NAME and PATH_INFO are empty or start with / - - - That at least one of SCRIPT_NAME or PATH_INFO are set. - - - That CONTENT_LENGTH is a positive integer. - - - That SCRIPT_NAME is not '/' (it should be '', and PATH_INFO should - be '/'). - - - That wsgi.input has the methods read, readline, readlines, and - __iter__ - - - That wsgi.errors has the methods flush, write, writelines - -* The status is a string, contains a space, starts with an integer, - and that integer is in range (> 100). - -* That the headers is a list (not a subclass, not another kind of - sequence). - -* That the items of the headers are tuples of strings. - -* That there is no 'status' header (that is used in CGI, but not in - WSGI). - -* That the headers don't contain newlines or colons, end in _ or -, or - contain characters codes below 037. - -* That Content-Type is given if there is content (CGI often has a - default content type, but WSGI does not). - -* That no Content-Type is given when there is no content (@@: is this - too restrictive?) - -* That the exc_info argument to start_response is a tuple or None. - -* That all calls to the writer are with strings, and no other methods - on the writer are accessed. - -* That wsgi.input is used properly: - - - .read() is called with zero or one argument - - - That it returns a string - - - That readline, readlines, and __iter__ return strings - - - That .close() is not called - - - No other methods are provided - -* That wsgi.errors is used properly: - - - .write() and .writelines() is called with a string - - - That .close() is not called, and no other methods are provided. - -* The response iterator: - - - That it is not a string (it should be a list of a single string; a - string will work, but perform horribly). - - - That .next() returns a string - - - That the iterator is not iterated over until start_response has - been called (that can signal either a server or application - error). - - - That .close() is called (doesn't raise exception, only prints to - sys.stderr, because we only know it isn't called when the object - is garbage collected). -""" -__all__ = ['middleware'] - - -import re -import sys -from types import DictType, StringType, TupleType, ListType -import warnings - -header_re = re.compile(r'^[a-zA-Z][a-zA-Z0-9\-_]*$') -bad_header_value_re = re.compile(r'[\000-\037]') - -class WSGIWarning(Warning): - """ - Raised in response to WSGI-spec-related warnings - """ - -def middleware(application): - - """ - When applied between a WSGI server and a WSGI application, this - middleware will check for WSGI compliancy on a number of levels. - This middleware does not modify the request or response in any - way, but will throw an AssertionError if anything seems off - (except for a failure to close the application iterator, which - will be printed to stderr -- there's no way to throw an exception - at that point). - """ - - def lint_app(*args, **kw): - assert len(args) == 2, "Two arguments required" - assert not kw, "No keyword arguments allowed" - environ, start_response = args - - check_environ(environ) - - # We use this to check if the application returns without - # calling start_response: - start_response_started = [] - - def start_response_wrapper(*args, **kw): - assert len(args) == 2 or len(args) == 3, ( - "Invalid number of arguments: %s" % args) - assert not kw, "No keyword arguments allowed" - status = args[0] - headers = args[1] - if len(args) == 3: - exc_info = args[2] - else: - exc_info = None - - check_status(status) - check_headers(headers) - check_content_type(status, headers) - check_exc_info(exc_info) - - start_response_started.append(None) - return WriteWrapper(start_response(*args)) - - environ['wsgi.input'] = InputWrapper(environ['wsgi.input']) - environ['wsgi.errors'] = ErrorWrapper(environ['wsgi.errors']) - - iterator = application(environ, start_response_wrapper) - assert iterator is not None and iterator != False, ( - "The application must return an iterator, if only an empty list") - - check_iterator(iterator) - - return IteratorWrapper(iterator, start_response_started) - - return lint_app - -class InputWrapper: - - def __init__(self, wsgi_input): - self.input = wsgi_input - - def read(self, *args): - assert len(args) <= 1 - v = self.input.read(*args) - assert type(v) is type("") - return v - - def readline(self): - v = self.input.readline() - assert type(v) is type("") - return v - - def readlines(self, *args): - assert len(args) <= 1 - lines = self.input.readlines(*args) - assert type(lines) is type([]) - for line in lines: - assert type(line) is type("") - return lines - - def __iter__(self): - while 1: - line = self.readline() - if not line: - return - yield line - - def close(self): - assert 0, "input.close() must not be called" - -class ErrorWrapper: - - def __init__(self, wsgi_errors): - self.errors = wsgi_errors - - def write(self, s): - assert type(s) is type("") - self.errors.write(s) - - def flush(self): - self.errors.flush() - - def writelines(self, seq): - for line in seq: - self.write(line) - - def close(self): - assert 0, "errors.close() must not be called" - -class WriteWrapper: - - def __init__(self, wsgi_writer): - self.writer = wsgi_writer - - def __call__(self, s): - assert type(s) is type("") - self.writer(s) - -class PartialIteratorWrapper: - - def __init__(self, wsgi_iterator): - self.iterator = wsgi_iterator - - def __iter__(self): - # We want to make sure __iter__ is called - return IteratorWrapper(self.iterator) - -class IteratorWrapper: - - def __init__(self, wsgi_iterator, check_start_response): - self.original_iterator = wsgi_iterator - self.iterator = iter(wsgi_iterator) - self.closed = False - self.check_start_response = check_start_response - - def __iter__(self): - return self - - def next(self): - assert not self.closed, ( - "Iterator read after closed") - v = self.iterator.next() - if self.check_start_response is not None: - assert self.check_start_response, ( - "The application returns and we started iterating over its body, but start_response has not yet been called") - self.check_start_response = None - return v - - def close(self): - self.closed = True - if hasattr(self.original_iterator, 'close'): - self.original_iterator.close() - - def __del__(self): - if not self.closed: - sys.stderr.write( - "Iterator garbage collected without being closed") - assert self.closed, ( - "Iterator garbage collected without being closed") - -def check_environ(environ): - assert type(environ) is DictType, ( - "Environment is not of the right type: %r (environment: %r)" - % (type(environ), environ)) - - for key in ['REQUEST_METHOD', 'SERVER_NAME', 'SERVER_PORT', - 'wsgi.version', 'wsgi.input', 'wsgi.errors', - 'wsgi.multithread', 'wsgi.multiprocess', - 'wsgi.run_once']: - assert key in environ, ( - "Environment missing required key: %r" % key) - - for key in ['HTTP_CONTENT_TYPE', 'HTTP_CONTENT_LENGTH']: - assert key not in environ, ( - "Environment should not have the key: %s " - "(use %s instead)" % (key, key[5:])) - - if 'QUERY_STRING' not in environ: - warnings.warn( - 'QUERY_STRING is not in the WSGI environment; the cgi ' - 'module will use sys.argv when this variable is missing, ' - 'so application errors are more likely', - WSGIWarning) - - for key in environ.keys(): - if '.' in key: - # Extension, we don't care about its type - continue - assert type(environ[key]) is StringType, ( - "Environmental variable %s is not a string: %r (value: %r)" - % (type(environ[key]), environ[key])) - - assert type(environ['wsgi.version']) is TupleType, ( - "wsgi.version should be a tuple (%r)" % environ['wsgi.version']) - assert environ['wsgi.url_scheme'] in ('http', 'https'), ( - "wsgi.url_scheme unknown: %r" % environ['wsgi.url_scheme']) - - check_input(environ['wsgi.input']) - check_errors(environ['wsgi.errors']) - - # @@: these need filling out: - if environ['REQUEST_METHOD'] not in ( - 'GET', 'HEAD', 'POST', 'OPTIONS','PUT','DELETE','TRACE'): - warnings.warn( - "Unknown REQUEST_METHOD: %r" % environ['REQUEST_METHOD'], - WSGIWarning) - - assert (not environ.get('SCRIPT_NAME') - or environ['SCRIPT_NAME'].startswith('/')), ( - "SCRIPT_NAME doesn't start with /: %r" % environ['SCRIPT_NAME']) - assert (not environ.get('PATH_INFO') - or environ['PATH_INFO'].startswith('/')), ( - "PATH_INFO doesn't start with /: %r" % environ['PATH_INFO']) - if environ.get('CONTENT_LENGTH'): - assert int(environ['CONTENT_LENGTH']) >= 0, ( - "Invalid CONTENT_LENGTH: %r" % environ['CONTENT_LENGTH']) - - if not environ.get('SCRIPT_NAME'): - assert environ.has_key('PATH_INFO'), ( - "One of SCRIPT_NAME or PATH_INFO are required (PATH_INFO " - "should at least be '/' if SCRIPT_NAME is empty)") - assert environ.get('SCRIPT_NAME') != '/', ( - "SCRIPT_NAME cannot be '/'; it should instead be '', and " - "PATH_INFO should be '/'") - -def check_input(wsgi_input): - for attr in ['read', 'readline', 'readlines', '__iter__']: - assert hasattr(wsgi_input, attr), ( - "wsgi.input (%r) doesn't have the attribute %s" - % (wsgi_input, attr)) - -def check_errors(wsgi_errors): - for attr in ['flush', 'write', 'writelines']: - assert hasattr(wsgi_errors, attr), ( - "wsgi.errors (%r) doesn't have the attribute %s" - % (wsgi_errors, attr)) - -def check_status(status): - assert type(status) is StringType, ( - "Status must be a string (not %r)" % status) - # Implicitly check that we can turn it into an integer: - status_code = status.split(None, 1)[0] - assert len(status_code) == 3, ( - "Status codes must be three characters: %r" % status_code) - status_int = int(status_code) - assert status_int >= 100, "Status code is invalid: %r" % status_int - if len(status) < 4 or status[3] != ' ': - warnings.warn( - "The status string (%r) should be a three-digit integer " - "followed by a single space and a status explanation" - % status, WSGIWarning) - -def check_headers(headers): - assert type(headers) is ListType, ( - "Headers (%r) must be of type list: %r" - % (headers, type(headers))) - header_names = {} - for item in headers: - assert type(item) is TupleType, ( - "Individual headers (%r) must be of type tuple: %r" - % (item, type(item))) - assert len(item) == 2 - name, value = item - assert name.lower() != 'status', ( - "The Status header cannot be used; it conflicts with CGI " - "script, and HTTP status is not given through headers " - "(value: %r)." % value) - header_names[name.lower()] = None - assert '\n' not in name and ':' not in name, ( - "Header names may not contain ':' or '\\n': %r" % name) - assert header_re.search(name), "Bad header name: %r" % name - assert not name.endswith('-') and not name.endswith('_'), ( - "Names may not end in '-' or '_': %r" % name) - assert not bad_header_value_re.search(value), ( - "Bad header value: %r (bad char: %r)" - % (value, bad_header_value_re.search(value).group(0))) - -def check_content_type(status, headers): - code = int(status.split(None, 1)[0]) - # @@: need one more person to verify this interpretation of RFC 2616 - # http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html - NO_MESSAGE_BODY = (204, 304) - for name, value in headers: - if name.lower() == 'content-type': - if code not in NO_MESSAGE_BODY: - return - assert 0, (("Content-Type header found in a %s response, " - "which must not return content.") % code) - if code not in NO_MESSAGE_BODY: - assert 0, "No Content-Type header found in headers (%s)" % headers - -def check_exc_info(exc_info): - assert exc_info is None or type(exc_info) is type(()), ( - "exc_info (%r) is not a tuple: %r" % (exc_info, type(exc_info))) - # More exc_info checks? - -def check_iterator(iterator): - # Technically a string is legal, which is why it's a really bad - # idea, because it may cause the response to be returned - # character-by-character - assert not isinstance(iterator, str), ( - "You should not return a string as your application iterator, " - "instead return a single-item list containing that string.") - |