diff options
author | ianb <devnull@localhost> | 2005-04-22 03:18:20 +0000 |
---|---|---|
committer | ianb <devnull@localhost> | 2005-04-22 03:18:20 +0000 |
commit | 36593a8d40cd1417d2927a78cdbe621090b6e8a5 (patch) | |
tree | 74dc42b93bb512593869fd57f8be99c198d8c108 /paste/lint.py | |
parent | 7752a3102fc8aa68ee7e349495bc2e9345c77c00 (diff) | |
download | paste-36593a8d40cd1417d2927a78cdbe621090b6e8a5.tar.gz |
Renamed package itself
Diffstat (limited to 'paste/lint.py')
-rw-r--r-- | paste/lint.py | 259 |
1 files changed, 259 insertions, 0 deletions
diff --git a/paste/lint.py b/paste/lint.py new file mode 100644 index 0000000..0eac637 --- /dev/null +++ b/paste/lint.py @@ -0,0 +1,259 @@ +""" +A lint of sorts; an anal middleware that checks for WSGI compliance +both in the server and the application (but otherwise does not effect +the request, it just looks at the communication). +""" + +import re +import sys +from types import * + +header_re = re.compile(r'^[a-zA-Z][a-zA-Z0-9\-_]*$') +bad_header_value_re = re.compile(r'[\000-\037]') + +def middleware(application): + def lint_app(*args, **kw): + assert len(args) == 2, "Two arguments required" + assert not kw, "No keyword arguments allowed" + environ, start_response = args + + check_environ(environ) + + # We use this to check if the application returns without + # calling start_response: + start_response_started = [] + + def start_response_wrapper(*args, **kw): + assert len(args) == 2 or len(args) == 3, "Invalid number of arguments: %s" % args + assert not kw, "No keyword arguments allowed" + status = args[0] + headers = args[1] + if len(args) == 3: + exc_info = args[2] + else: + exc_info = None + + check_status(status) + check_headers(headers) + check_content_type(status, headers) + check_exc_info(exc_info) + + start_response_started.append(None) + return WriteWrapper(start_response(*args)) + + environ['wsgi.input'] = InputWrapper(environ['wsgi.input']) + environ['wsgi.errors'] = ErrorWrapper(environ['wsgi.errors']) + + iterator = application(environ, start_response_wrapper) + assert start_response_started, ( + "The application returned, but did not call start_response()") + assert iterator is not None and iterator != False, \ + "The application must return an iterator, if only an empty list" + + check_iterator(iterator) + + return IteratorWrapper(iterator) + + return lint_app + +class InputWrapper: + + def __init__(self, wsgi_input): + self.input = wsgi_input + + def read(self, *args): + assert len(args) <= 1 + v = self.input.read(*args) + assert type(v) is type("") + return v + + def readline(self): + v = self.input.readline() + assert type(v) is type("") + return v + + def readlines(self, *args): + assert len(args) <= 1 + lines = self.input.readlines(*args) + assert type(lines) is type([]) + for line in lines: + assert type(line) is type("") + return lines + + def __iter__(self): + while 1: + line = self.readline() + if not line: + return + yield line + + def close(self): + assert 0, "input.close() must not be called" + +class ErrorWrapper: + + def __init__(self, wsgi_errors): + self.errors = wsgi_errors + + def write(self, s): + assert type(s) is type("") + self.errors.write(s) + + def flush(self): + self.errors.flush() + + def writelines(self, seq): + for line in seq: + self.write(line) + + def close(self): + assert 0, "errors.close() must not be called" + +class WriteWrapper: + + def __init__(self, wsgi_writer): + self.writer = wsgi_writer + + def __call__(self, s): + assert type(s) is type("") + self.writer(s) + +class PartialIteratorWrapper: + + def __init__(self, wsgi_iterator): + self.iterator = iterator + + def __iter__(self): + # We want to make sure __iter__ is called + return IteratorWrapper(self.iterator) + +class IteratorWrapper: + + def __init__(self, wsgi_iterator): + self.original_iterator = wsgi_iterator + self.iterator = iter(wsgi_iterator) + self.closed = False + + def __iter__(self): + return self + + def next(self): + assert not self.closed, \ + "Iterator read after closed" + return self.iterator.next() + + def close(self): + self.closed = True + if hasattr(self.original_iterator, 'close'): + self.original_iterator.close() + + def __del__(self): + if not self.closed: + sys.stderr.write("Iterator garbage collected without being closed") + assert self.closed, \ + "Iterator garbage collected without being closed" + +def check_environ(environ): + assert type(environ) is DictType, \ + "Environment is not of the right type: %r (environment: %r)" % (type(environ), environ) + + for key in ['REQUEST_METHOD', 'SERVER_NAME', 'SERVER_PORT', + 'wsgi.version', 'wsgi.input', 'wsgi.errors', + 'wsgi.multithread', 'wsgi.multiprocess', + 'wsgi.run_once']: + assert environ.has_key(key), \ + "Environment missing required key: %r" % key + + for key in environ.keys(): + if '.' in key: + # Extension, we don't care about its type + continue + assert type(environ[key]) is StringType, \ + "Environmental variable %s is not a string: %r (value: %r)" % (type(environ[key]), environ[key]) + + assert type(environ['wsgi.version']) is TupleType, \ + "wsgi.version should be a tuple (%r)" % environ['wsgi.version'] + assert environ['wsgi.url_scheme'] in ('http', 'https'), \ + "wsgi.url_scheme unknown: %r" % environ['wsgi.url_scheme'] + + check_input(environ['wsgi.input']) + check_errors(environ['wsgi.errors']) + + # @@: these need filling out: + assert environ['REQUEST_METHOD'] in ('GET', 'HEAD', 'POST'), "Unknown REQUEST_METHOD: %r" % environ['REQUEST_METHOD'] + + assert (not environ.get('SCRIPT_NAME') + or environ['SCRIPT_NAME'].startswith('/')), \ + "SCRIPT_NAME doesn't start with /: %r" % environ['SCRIPT_NAME'] + assert (not environ.get('PATH_INFO') + or environ['PATH_INFO'].startswith('/')), \ + "PATH_INFO doesn't start with /: %s" % environ['PATH_INFO'] + if environ.get('CONTENT_LENGTH'): + assert int(environ['CONTENT_LENGTH']) >= 0, "Invalid CONTENT_LENGTH: %r" % environ['CONTENT_LENGTH'] + + if not environ.get('SCRIPT_NAME'): + assert environ.has_key('PATH_INFO'), \ + "One of SCRIPT_NAME or PATH_INFO are required (PATH_INFO should at least be '/' if SCRIPT_NAME is empty)" + assert environ.get('SCRIPT_NAME') != '/', \ + "SCRIPT_NAME cannot be '/'; it should instead be '', and PATH_INFO should be '/'" + +def check_input(wsgi_input): + for attr in ['read', 'readline', 'readlines', '__iter__']: + assert hasattr(wsgi_input, attr), \ + "wsgi.input (%r) doesn't have the attribute %s" % (wsgi_input, attr) + +def check_errors(wsgi_errors): + for attr in ['flush', 'write', 'writelines']: + assert hasattr(wsgi_errors, attr), \ + "wsgi.errors (%r) doesn't have the attributes %s" % (wsgi_errors, attr) + +def check_status(status): + assert type(status) is StringType, \ + "Status must be a string (not %r)" % status + # Implicitly check that we can turn it into an integer: + status_int = int(status.split(None, 1)[0]) + assert status_int >= 100, "Status code is invalid: %r" % status_int + +def check_headers(headers): + assert type(headers) is ListType, \ + "Headers (%r) must be of type list: %r" % (headers, type(headers)) + header_names = {} + for item in headers: + assert type(item) is TupleType, \ + "Individual headers (%r) must be of type tuple: %r" % (item, type(item)) + assert len(item) == 2 + name, value = item + assert name.lower() != 'status', \ + "The Status header cannot be used; it conflicts with CGI script, and HTTP status is not given through headers (value: %r)." % value + header_names[name.lower()] = None + assert '\n' not in name and ':' not in name, \ + "Header names may not contain ':' or '\\n': %r" % name + assert header_re.search(name), "Bad header name: %r" % name + assert not name.endswith('-') and not name.endswith('_'), \ + "Names may not end in '-' or '_': %r" % name + assert not bad_header_value_re.search(value), \ + "Bad header value: %r (bad char: %r)" % (value, bad_header_value_re.search(value).group(0)) + +def check_content_type(status, headers): + code = int(status.split(None, 1)[0]) + if code == 204: + # 204 No Content is the only code where there's no body, + # and so it doesn't need a content-type header. + # @@: Not 100% sure this is the only case where a content-type + # header can be left out + return + for name, value in headers: + if name.lower() == 'content-type': + return + assert 0, "No Content-Type header found in headers (%s)" % headers + +def check_exc_info(exc_info): + assert not exc_info or type(exc_info) is type(()), "exc_info (%r) is not a tuple: %r" % (exc_info, type(exc_info)) + # More exc_info checks? + +def check_iterator(iterator): + # Technically a string is legal, which is why it's a really bad + # idea, because it may cause the response to be returned + # character-by-character + assert not isinstance(iterator, str), \ + "You should not return a string as your application iterator, instead return a single-item list containing that string." |