diff options
| author | Gael Pasgrimaud <gael@gawel.org> | 2011-08-20 17:57:26 +0200 |
|---|---|---|
| committer | Gael Pasgrimaud <gael@gawel.org> | 2011-08-20 17:57:26 +0200 |
| commit | 53889b57fe16c57fd7f532953d2e15bfaba7e5b3 (patch) | |
| tree | c1f4edaf363fd674c0d8407749788ab7d8a1d07e /webtest | |
| parent | 3ca2d32a7da7d2cc92702a45afd34fb707408f75 (diff) | |
| download | webtest-53889b57fe16c57fd7f532953d2e15bfaba7e5b3.tar.gz | |
move __init__ to testapp module; few doc improvements
Diffstat (limited to 'webtest')
| -rw-r--r-- | webtest/__init__.py | 1613 | ||||
| -rw-r--r-- | webtest/testapp.py | 1609 |
2 files changed, 1622 insertions, 1600 deletions
diff --git a/webtest/__init__.py b/webtest/__init__.py index f258c25..a1acbf3 100644 --- a/webtest/__init__.py +++ b/webtest/__init__.py @@ -6,1604 +6,17 @@ Routines for testing WSGI applications. Most interesting is TestApp """ -import sys -import random -import urllib -import warnings -import mimetypes -import time -import cgi -import os -import re -import fnmatch -from webtest.compat import urlparse -from webtest.compat import print_stderr -from webtest.compat import StringIO -from webtest.compat import SimpleCookie, CookieError -from webtest.compat import cookie_quote -from webob import Request, Response -from webtest import lint +from webtest.testapp import TestApp +from webtest.testapp import TestRequest +from webtest.testapp import TestResponse +from webtest.testapp import Form +from webtest.testapp import Field +from webtest.testapp import AppError +from webtest.testapp import Select +from webtest.testapp import Radio +from webtest.testapp import Checkbox +from webtest.testapp import Text +from webtest.testapp import Textarea +from webtest.testapp import Hidden +from webtest.testapp import Submit -__all__ = ['TestApp', 'TestRequest'] - -def tempnam_no_warning(*args): - """ - An os.tempnam with the warning turned off, because sometimes - you just need to use this and don't care about the stupid - security warning. - """ - return os.tempnam(*args) - -class NoDefault(object): - pass - -try: - sorted -except NameError: - def sorted(l): - l = list(l) - l.sort() - return l - -class AppError(Exception): - pass - -class CaptureStdout(object): - - def __init__(self, actual): - self.captured = StringIO() - self.actual = actual - - def write(self, s): - self.captured.write(s) - self.actual.write(s) - - def flush(self): - self.actual.flush() - - def writelines(self, lines): - for item in lines: - self.write(item) - - def getvalue(self): - return self.captured.getvalue() - -class TestResponse(Response): - - """ - Instances of this class are return by ``TestApp`` - """ - - _forms_indexed = None - - - def forms__get(self): - """ - Returns a dictionary of ``Form`` objects. Indexes are both in - order (from zero) and by form id (if the form is given an id). - """ - if self._forms_indexed is None: - self._parse_forms() - return self._forms_indexed - - forms = property(forms__get, - doc=""" - A list of <form>s found on the page (instances of - ``Form``) - """) - - def form__get(self): - forms = self.forms - if not forms: - raise TypeError( - "You used response.form, but no forms exist") - if 1 in forms: - # There is more than one form - raise TypeError( - "You used response.form, but more than one form exists") - return forms[0] - - form = property(form__get, - doc=""" - Returns a single ``Form`` instance; it - is an error if there are multiple forms on the - page. - """) - - @property - def testbody(self): - if getattr(self, '_use_unicode', True) and self.charset: - return self.unicode_body - return self.body - - _tag_re = re.compile(r'<(/?)([:a-z0-9_\-]*)(.*?)>', re.S|re.I) - - def _parse_forms(self): - forms = self._forms_indexed = {} - form_texts = [] - started = None - for match in self._tag_re.finditer(self.testbody): - end = match.group(1) == '/' - tag = match.group(2).lower() - if tag != 'form': - continue - if end: - assert started, ( - "</form> unexpected at %s" % match.start()) - form_texts.append(self.testbody[started:match.end()]) - started = None - else: - assert not started, ( - "Nested form tags at %s" % match.start()) - started = match.start() - assert not started, ( - "Danging form: %r" % self.testbody[started:]) - for i, text in enumerate(form_texts): - form = Form(self, text) - forms[i] = form - if form.id: - forms[form.id] = form - - def follow(self, **kw): - """ - If this request is a redirect, follow that redirect. It - is an error if this is not a redirect response. Returns - another response object. - """ - assert self.status_int >= 300 and self.status_int < 400, ( - "You can only follow redirect responses (not %s)" - % self.status) - location = self.headers['location'] - type, rest = urllib.splittype(location) - host, path = urllib.splithost(rest) - # @@: We should test that it's not a remote redirect - return self.test_app.get(location, **kw) - - def click(self, description=None, linkid=None, href=None, - anchor=None, index=None, verbose=False, - extra_environ=None): - """ - Click the link as described. Each of ``description``, - ``linkid``, and ``url`` are *patterns*, meaning that they are - either strings (regular expressions), compiled regular - expressions (objects with a ``search`` method), or callables - returning true or false. - - All the given patterns are ANDed together: - - * ``description`` is a pattern that matches the contents of the - anchor (HTML and all -- everything between ``<a...>`` and - ``</a>``) - - * ``linkid`` is a pattern that matches the ``id`` attribute of - the anchor. It will receive the empty string if no id is - given. - - * ``href`` is a pattern that matches the ``href`` of the anchor; - the literal content of that attribute, not the fully qualified - attribute. - - * ``anchor`` is a pattern that matches the entire anchor, with - its contents. - - If more than one link matches, then the ``index`` link is - followed. If ``index`` is not given and more than one link - matches, or if no link matches, then ``IndexError`` will be - raised. - - If you give ``verbose`` then messages will be printed about - each link, and why it does or doesn't match. If you use - ``app.click(verbose=True)`` you'll see a list of all the - links. - - You can use multiple criteria to essentially assert multiple - aspects about the link, e.g., where the link's destination is. - """ - __tracebackhide__ = True - found_html, found_desc, found_attrs = self._find_element( - tag='a', href_attr='href', - href_extract=None, - content=description, - id=linkid, - href_pattern=href, - html_pattern=anchor, - index=index, verbose=verbose) - return self.goto(found_attrs['uri'], extra_environ=extra_environ) - - def clickbutton(self, description=None, buttonid=None, href=None, - button=None, index=None, verbose=False): - """ - Like ``.click()``, except looks for link-like buttons. - This kind of button should look like - ``<button onclick="...location.href='url'...">``. - """ - __tracebackhide__ = True - found_html, found_desc, found_attrs = self._find_element( - tag='button', href_attr='onclick', - href_extract=re.compile(r"location\.href='(.*?)'"), - content=description, - id=buttonid, - href_pattern=href, - html_pattern=button, - index=index, verbose=verbose) - return self.goto(found_attrs['uri']) - - def _find_element(self, tag, href_attr, href_extract, - content, id, - href_pattern, - html_pattern, - index, verbose): - content_pat = _make_pattern(content) - id_pat = _make_pattern(id) - href_pat = _make_pattern(href_pattern) - html_pat = _make_pattern(html_pattern) - - _tag_re = re.compile(r'<%s\s+(.*?)>(.*?)</%s>' % (tag, tag), - re.I+re.S) - _script_re = re.compile(r'<script.*?>.*?</script>', re.I|re.S) - bad_spans = [] - for match in _script_re.finditer(self.testbody): - bad_spans.append((match.start(), match.end())) - - def printlog(s): - if verbose: - print(s) - - found_links = [] - total_links = 0 - for match in _tag_re.finditer(self.testbody): - found_bad = False - for bad_start, bad_end in bad_spans: - if (match.start() > bad_start - and match.end() < bad_end): - found_bad = True - break - if found_bad: - continue - el_html = match.group(0) - el_attr = match.group(1) - el_content = match.group(2) - attrs = _parse_attrs(el_attr) - if verbose: - printlog('Element: %r' % el_html) - if not attrs.get(href_attr): - printlog(' Skipped: no %s attribute' % href_attr) - continue - el_href = attrs[href_attr] - if href_extract: - m = href_extract.search(el_href) - if not m: - printlog(" Skipped: doesn't match extract pattern") - continue - el_href = m.group(1) - attrs['uri'] = el_href - if el_href.startswith('#'): - printlog(' Skipped: only internal fragment href') - continue - if el_href.startswith('javascript:'): - printlog(' Skipped: cannot follow javascript:') - continue - total_links += 1 - if content_pat and not content_pat(el_content): - printlog(" Skipped: doesn't match description") - continue - if id_pat and not id_pat(attrs.get('id', '')): - printlog(" Skipped: doesn't match id") - continue - if href_pat and not href_pat(el_href): - printlog(" Skipped: doesn't match href") - continue - if html_pat and not html_pat(el_html): - printlog(" Skipped: doesn't match html") - continue - printlog(" Accepted") - found_links.append((el_html, el_content, attrs)) - if not found_links: - raise IndexError( - "No matching elements found (from %s possible)" - % total_links) - if index is None: - if len(found_links) > 1: - raise IndexError( - "Multiple links match: %s" - % ', '.join([repr(anc) for anc, d, attr in found_links])) - found_link = found_links[0] - else: - try: - found_link = found_links[index] - except IndexError: - raise IndexError( - "Only %s (out of %s) links match; index %s out of range" - % (len(found_links), total_links, index)) - return found_link - - def goto(self, href, method='get', **args): - """ - Go to the (potentially relative) link ``href``, using the - given method (``'get'`` or ``'post'``) and any extra arguments - you want to pass to the ``app.get()`` or ``app.post()`` - methods. - - All hostnames and schemes will be ignored. - """ - scheme, host, path, query, fragment = urlparse.urlsplit(href) - # We - scheme = host = fragment = '' - href = urlparse.urlunsplit((scheme, host, path, query, fragment)) - href = urlparse.urljoin(self.request.url, href) - method = method.lower() - assert method in ('get', 'post'), ( - 'Only "get" or "post" are allowed for method (you gave %r)' - % method) - - # encode unicode strings for the outside world - if getattr(self, '_use_unicode', False): - def to_str(s): - if isinstance(s, unicode): - return s.encode(self.charset) - return s - - href = to_str(href) - - if 'params' in args: - args['params'] = [tuple(map(to_str, p)) for p in args['params']] - - if 'upload_files' in args: - args['upload_files'] = [map(to_str, f) for f in args['upload_files']] - - if 'content_type' in args: - args['content_type'] = to_str(args['content_type']) - - if method == 'get': - method = self.test_app.get - else: - method = self.test_app.post - return method(href, **args) - - _normal_body_regex = re.compile(r'[ \n\r\t]+') - - _normal_body = None - - def normal_body__get(self): - if self._normal_body is None: - self._normal_body = self._normal_body_regex.sub( - ' ', self.body) - return self._normal_body - - normal_body = property(normal_body__get, - doc=""" - Return the whitespace-normalized body - """.strip()) - - def unicode_normal_body__get(self): - if not self.charset: - raise AttributeError( - "You cannot access Response.unicode_normal_body unless charset is set") - return self.normal_body.decode(self.charset) - - unicode_normal_body = property( - unicode_normal_body__get, doc=""" - Return the whitespace-normalized body, as unicode - """.strip()) - - def __contains__(self, s): - """ - A response 'contains' a string if it is present in the body - of the response. Whitespace is normalized when searching - for a string. - """ - if not isinstance(s, basestring): - if hasattr(s, '__unicode__'): - s = unicode(s) - else: - s = str(s) - if isinstance(s, unicode): - body = self.unicode_body - normal_body = self.unicode_normal_body - else: - body = self.body - normal_body = self.normal_body - return s in body or s in normal_body - - def mustcontain(self, *strings, **kw): - """ - Assert that the response contains all of the strings passed - in as arguments. - - Equivalent to:: - - assert string in res - """ - if 'no' in kw: - no = kw['no'] - del kw['no'] - if isinstance(no, basestring): - no = [no] - else: - no = [] - if kw: - raise TypeError( - "The only keyword argument allowed is 'no'") - for s in strings: - if not s in self: - print_stderr("Actual response (no %r):" % s) - print_stderr(self) - raise IndexError( - "Body does not contain string %r" % s) - for no_s in no: - if no_s in self: - print_stderr("Actual response (has %r)" % no_s) - print_stderr(self) - raise IndexError( - "Body contains bad string %r" % no_s) - - def __str__(self): - simple_body = '\n'.join([l for l in self.body.splitlines() - if l.strip()]) - headers = [(self._normalize_header_name(n), v) - for n, v in self.headerlist - if n.lower() != 'content-length'] - headers.sort() - return 'Response: %s\n%s\n%s' % ( - self.status, - '\n'.join(['%s: %s' % (n, v) for n, v in headers]), - simple_body) - - def _normalize_header_name(self, name): - name = name.replace('-', ' ').title().replace(' ', '-') - return name - - def __repr__(self): - # Specifically intended for doctests - if self.content_type: - ct = ' %s' % self.content_type - else: - ct = '' - if self.body: - br = repr(self.body) - if len(br) > 18: - br = br[:10]+'...'+br[-5:] - br += '/%s' % len(self.body) - body = ' body=%s' % br - else: - body = ' no body' - if self.location: - location = ' location: %s' % self.location - else: - location = '' - return ('<' + self.status + ct + location + body + '>') - - def html(self): - """ - Returns the response as a `BeautifulSoup - <http://www.crummy.com/software/BeautifulSoup/documentation.html>`_ - object. - - Only works with HTML responses; other content-types raise - AttributeError. - """ - if 'html' not in self.content_type: - raise AttributeError( - "Not an HTML response body (content-type: %s)" - % self.content_type) - try: - from BeautifulSoup import BeautifulSoup - except ImportError: - raise ImportError( - "You must have BeautifulSoup installed to use response.html") - soup = BeautifulSoup(self.testbody) - return soup - - html = property(html, doc=html.__doc__) - - def xml(self): - """ - Returns the response as an `ElementTree - <http://python.org/doc/current/lib/module-xml.etree.ElementTree.html>`_ - object. - - Only works with XML responses; other content-types raise - AttributeError - """ - if 'xml' not in self.content_type: - raise AttributeError( - "Not an XML response body (content-type: %s)" - % self.content_type) - try: - from xml.etree import ElementTree - except ImportError: - try: - import ElementTree - except ImportError: - try: - from elementtree import ElementTree - except ImportError: - raise ImportError( - "You must have ElementTree installed (or use Python 2.5) to use response.xml") - # ElementTree can't parse unicode => use `body` instead of `testbody` - return ElementTree.XML(self.body) - - xml = property(xml, doc=xml.__doc__) - - def lxml(self): - """ - Returns the response as an `lxml object - <http://codespeak.net/lxml/>`_. You must have lxml installed - to use this. - - If this is an HTML response and you have lxml 2.x installed, - then an ``lxml.html.HTML`` object will be returned; if you - have an earlier version of lxml then a ``lxml.HTML`` object - will be returned. - """ - if ('html' not in self.content_type - and 'xml' not in self.content_type): - raise AttributeError( - "Not an XML or HTML response body (content-type: %s)" - % self.content_type) - try: - from lxml import etree - except ImportError: - raise ImportError( - "You must have lxml installed to use response.lxml") - try: - from lxml.html import fromstring - except ImportError: - fromstring = etree.HTML - ## FIXME: would be nice to set xml:base, in some fashion - if self.content_type == 'text/html': - return fromstring(self.testbody, base_url=self.request.url) - else: - return etree.XML(self.testbody, base_url=self.request.url) - - lxml = property(lxml, doc=lxml.__doc__) - - def json(self): - """ - Return the response as a JSON response. You must have - `simplejson - <http://svn.red-bean.com/bob/simplejson/tags/simplejson-1.7/docs/index.html>`_ - installed to use this, or be using a Python version with the - json module. - - The content type must be application/json to use this. - """ - if self.content_type != 'application/json': - raise AttributeError( - "Not a JSON response body (content-type: %s)" - % self.content_type) - try: - from simplejson import loads - except ImportError: - try: - from json import loads - except ImportError: - raise ImportError( - "You must have simplejson installed to use response.json") - return loads(self.testbody) - - json = property(json, doc=json.__doc__) - - def pyquery(self): - """ - Returns the response as a `PyQuery <http://pyquery.org/>`_ object. - - Only works with HTML and XML responses; other content-types raise - AttributeError. - """ - if 'html' not in self.content_type and 'xml' not in self.content_type: - raise AttributeError( - "Not an HTML or XML response body (content-type: %s)" - % self.content_type) - try: - from pyquery import PyQuery - except ImportError: - raise ImportError( - "You must have PyQuery installed to use response.pyquery") - d = PyQuery(self.testbody) - return d - - pyquery = property(pyquery, doc=pyquery.__doc__) - - def showbrowser(self): - """ - Show this response in a browser window (for debugging purposes, - when it's hard to read the HTML). - """ - import webbrowser - fn = tempnam_no_warning(None, 'webtest-page') + '.html' - f = open(fn, 'wb') - f.write(self.body) - f.close() - url = 'file:' + fn.replace(os.sep, '/') - webbrowser.open_new(url) - -class TestRequest(Request): - - # for py.test - disabled = True - ResponseClass = TestResponse - -class TestApp(object): - """ - Wraps a WSGI application in a more convenient interface for - testing. - - ``app`` may be an application, or a Paste Deploy app - URI, like ``'config:filename.ini#test'``. - - ``extra_environ`` is a dictionary of values that should go - into the environment for each request. These can provide a - communication channel with the application. - - ``relative_to`` is a directory, and filenames used for file - uploads are calculated relative to this. Also ``config:`` - URIs that aren't absolute. - """ - - # for py.test - disabled = True - RequestClass = TestRequest - - def __init__(self, app, extra_environ=None, relative_to=None, use_unicode=True): - if isinstance(app, (str, unicode)): - from paste.deploy import loadapp - # @@: Should pick up relative_to from calling module's - # __file__ - app = loadapp(app, relative_to=relative_to) - self.app = app - self.relative_to = relative_to - if extra_environ is None: - extra_environ = {} - self.extra_environ = extra_environ - self.use_unicode = use_unicode - self.reset() - - def reset(self): - """ - Resets the state of the application; currently just clears - saved cookies. - """ - self.cookies = {} - - def _make_environ(self, extra_environ=None): - environ = self.extra_environ.copy() - environ['paste.throw_errors'] = True - if extra_environ: - environ.update(extra_environ) - return environ - - def _remove_fragment(self, url): - scheme, netloc, path, query, fragment = urlparse.urlsplit(url) - return urlparse.urlunsplit((scheme, netloc, path, query, "")) - - def get(self, url, params=None, headers=None, extra_environ=None, - status=None, expect_errors=False): - """ - Get the given url (well, actually a path like - ``'/page.html'``). - - ``params``: - A query string, or a dictionary that will be encoded - into a query string. You may also include a query - string on the ``url``. - - ``headers``: - A dictionary of extra headers to send. - - ``extra_environ``: - A dictionary of environmental variables that should - be added to the request. - - ``status``: - The integer status code you expect (if not 200 or 3xx). - If you expect a 404 response, for instance, you must give - ``status=404`` or it will be an error. You can also give - a wildcard, like ``'3*'`` or ``'*'``. - - ``expect_errors``: - If this is not true, then if anything is written to - ``wsgi.errors`` it will be an error. If it is true, then - non-200/3xx responses are also okay. - - Returns a ``webob.Response`` object. - """ - environ = self._make_environ(extra_environ) - # Hide from py.test: - __tracebackhide__ = True - url = self._remove_fragment(url) - if params: - if not isinstance(params, (str, unicode)): - params = urllib.urlencode(params, doseq=True) - if '?' in url: - url += '&' - else: - url += '?' - url += params - url = str(url) - if '?' in url: - url, environ['QUERY_STRING'] = url.split('?', 1) - else: - environ['QUERY_STRING'] = '' - req = self.RequestClass.blank(url, environ) - if headers: - req.headers.update(headers) - return self.do_request(req, status=status, - expect_errors=expect_errors) - - def _gen_request(self, method, url, params='', headers=None, extra_environ=None, - status=None, upload_files=None, expect_errors=False, - content_type=None): - """ - Do a generic request. - """ - environ = self._make_environ(extra_environ) - # @@: Should this be all non-strings? - if isinstance(params, (list, tuple, dict)): - params = urllib.urlencode(params, doseq=True) - if hasattr(params, 'items'): - params = urllib.urlencode(params.items(), doseq=True) - if upload_files or (content_type and content_type.startswith('multipart')): - params = cgi.parse_qsl(params, keep_blank_values=True) - content_type, params = self.encode_multipart( - params, upload_files or ()) - environ['CONTENT_TYPE'] = content_type - elif params: - environ.setdefault('CONTENT_TYPE', 'application/x-www-form-urlencoded') - if '?' in url: - url, environ['QUERY_STRING'] = url.split('?', 1) - else: - environ['QUERY_STRING'] = '' - if content_type is not None: - environ['CONTENT_TYPE'] = content_type - environ['CONTENT_LENGTH'] = str(len(params)) - environ['REQUEST_METHOD'] = method - environ['wsgi.input'] = StringIO(params) - url = self._remove_fragment(url) - req = self.RequestClass.blank(url, environ) - if headers: - req.headers.update(headers) - return self.do_request(req, status=status, - expect_errors=expect_errors) - - def post(self, url, params='', headers=None, extra_environ=None, - status=None, upload_files=None, expect_errors=False, - content_type=None): - """ - Do a POST request. Very like the ``.get()`` method. - ``params`` are put in the body of the request. - - ``upload_files`` is for file uploads. It should be a list of - ``[(fieldname, filename, file_content)]``. You can also use - just ``[(fieldname, filename)]`` and the file content will be - read from disk. - - Returns a ``webob.Response`` object. - """ - return self._gen_request('POST', url, params=params, headers=headers, - extra_environ=extra_environ,status=status, - upload_files=upload_files, - expect_errors=expect_errors, - content_type=content_type) - - def put(self, url, params='', headers=None, extra_environ=None, - status=None, upload_files=None, expect_errors=False, - content_type=None): - """ - Do a PUT request. Very like the ``.put()`` method. - ``params`` are put in the body of the request, if params is a - tuple, dictionary, list, or iterator it will be urlencoded and - placed in the body as with a POST, if it is string it will not - be encoded, but placed in the body directly. - - Returns a ``webob.Response`` object. - """ - return self._gen_request('PUT', url, params=params, headers=headers, - extra_environ=extra_environ,status=status, - upload_files=upload_files, - expect_errors=expect_errors, - content_type=content_type) - - def delete(self, url, params='', headers=None, extra_environ=None, - status=None, expect_errors=False): - """ - Do a DELETE request. Very like the ``.get()`` method. - - Returns a ``webob.Response`` object. - """ - if params: - warnings.warn(('You are not supposed to send a body in a ' - 'DELETE request. Most web servers will ignore it'), - lint.WSGIWarning) - return self._gen_request('DELETE', url, params=params, headers=headers, - extra_environ=extra_environ,status=status, - upload_files=None, expect_errors=expect_errors) - - def head(self, url, headers=None, extra_environ=None, - status=None, expect_errors=False): - """ - Do a HEAD request. Very like the ``.get()`` method. - - Returns a ``webob.Response`` object. - """ - return self._gen_request('HEAD', url, headers=headers, - extra_environ=extra_environ,status=status, - upload_files=None, expect_errors=expect_errors) - - def encode_multipart(self, params, files): - """ - Encodes a set of parameters (typically a name/value list) and - a set of files (a list of (name, filename, file_body)) into a - typical POST body, returning the (content_type, body). - """ - boundary = '----------a_BoUnDaRy%s$' % random.random() - lines = [] - for key, value in params: - lines.append('--'+boundary) - lines.append('Content-Disposition: form-data; name="%s"' % key) - lines.append('') - lines.append(value) - for file_info in files: - key, filename, value = self._get_file_info(file_info) - lines.append('--'+boundary) - lines.append('Content-Disposition: form-data; name="%s"; filename="%s"' - % (key, filename)) - fcontent = mimetypes.guess_type(filename)[0] - lines.append('Content-Type: %s' % - (fcontent or 'application/octet-stream')) - lines.append('') - lines.append(value) - lines.append('--' + boundary + '--') - lines.append('') - body = '\r\n'.join(lines) - content_type = 'multipart/form-data; boundary=%s' % boundary - return content_type, body - - def _get_file_info(self, file_info): - if len(file_info) == 2: - # It only has a filename - filename = file_info[1] - if self.relative_to: - filename = os.path.join(self.relative_to, filename) - f = open(filename, 'rb') - content = f.read() - f.close() - return (file_info[0], filename, content) - elif len(file_info) == 3: - return file_info - else: - raise ValueError( - "upload_files need to be a list of tuples of (fieldname, " - "filename, filecontent) or (fieldname, filename); " - "you gave: %r" - % repr(file_info)[:100]) - - - def request(self, url_or_req, status=None, expect_errors=False, - **req_params): - """ - Creates and executes a request. You may either pass in an - instantiated :class:`TestRequest` object, or you may pass in a - URL and keyword arguments to be passed to - :meth:`TestRequest.blank`. - - You can use this to run a request without the intermediary - functioning of :meth:`TestApp.get` etc. For instance, to - test a WebDAV method:: - - resp = app.request('/new-col', method='MKCOL') - - Note that the request won't have a body unless you specify it, - like:: - - resp = app.request('/test.txt', method='PUT', body='test') - - You can use ``POST={args}`` to set the request body to the - serialized arguments, and simultaneously set the request - method to ``POST`` - """ - if isinstance(url_or_req, basestring): - req = self.RequestClass.blank(url_or_req, **req_params) - else: - req = url_or_req.copy() - for name, value in req_params.iteritems(): - setattr(req, name, value) - if req.content_length == -1: - req.content_length = len(req.body) - req.environ['paste.throw_errors'] = True - for name, value in self.extra_environ.iteritems(): - req.environ.setdefault(name, value) - return self.do_request(req, status=status, expect_errors=expect_errors) - - def do_request(self, req, status, expect_errors): - """ - Executes the given request (``req``), with the expected - ``status``. Generally ``.get()`` and ``.post()`` are used - instead. - - To use this:: - - resp = app.do_request(webtest.TestRequest.blank( - 'url', ...args...)) - - Note you can pass any keyword arguments to - ``TestRequest.blank()``, which will be set on the request. - These can be arguments like ``content_type``, ``accept``, etc. - """ - __tracebackhide__ = True - errors = StringIO() - req.environ['wsgi.errors'] = errors - script_name = req.environ.get('SCRIPT_NAME', '') - if script_name and req.path_info.startswith(script_name): - req.path_info = req.path_info[len(script_name):] - if self.cookies: - cookie_header = ''.join([ - '%s=%s; ' % (name, cookie_quote(value)) - for name, value in self.cookies.items()]) - req.environ['HTTP_COOKIE'] = cookie_header - req.environ['paste.testing'] = True - req.environ['paste.testing_variables'] = {} - app = lint.middleware(self.app) - old_stdout = sys.stdout - out = CaptureStdout(old_stdout) - try: - sys.stdout = out - start_time = time.time() - ## FIXME: should it be an option to not catch exc_info? - res = req.get_response(app, catch_exc_info=True) - res._use_unicode = self.use_unicode - end_time = time.time() - finally: - sys.stdout = old_stdout - res.app = app - res.test_app = self - # We do this to make sure the app_iter is exausted: - res.body - res.errors = errors.getvalue() - total_time = end_time - start_time - for name, value in req.environ['paste.testing_variables'].items(): - if hasattr(res, name): - raise ValueError( - "paste.testing_variables contains the variable %r, but " - "the response object already has an attribute by that " - "name" % name) - setattr(res, name, value) - if not expect_errors: - self._check_status(status, res) - self._check_errors(res) - res.cookies_set = {} - for header in res.headers.getall('set-cookie'): - try: - c = SimpleCookie(header) - except CookieError: - raise CookieError( - "Could not parse cookie header %r" % (header,)) - for key, morsel in c.items(): - self.cookies[key] = morsel.value - res.cookies_set[key] = morsel.value - return res - - def _check_status(self, status, res): - __tracebackhide__ = True - if status == '*': - return - if (isinstance(status, basestring) - and '*' in status): - if re.match(fnmatch.translate(status), res.status, re.I): - return - if isinstance(status, (list, tuple)): - if res.status_int not in status: - raise AppError( - "Bad response: %s (not one of %s for %s)\n%s" - % (res.status, ', '.join(map(str, status)), - res.request.url, res.body)) - return - if status is None: - if res.status_int >= 200 and res.status_int < 400: - return - raise AppError( - "Bad response: %s (not 200 OK or 3xx redirect for %s)\n%s" - % (res.status, res.request.url, - res.body)) - if status != res.status_int: - raise AppError( - "Bad response: %s (not %s)" % (res.status, status)) - - def _check_errors(self, res): - errors = res.errors - if errors: - raise AppError( - "Application had errors logged:\n%s" % errors) - - -######################################## -## Form objects -######################################## - - -_attr_re = re.compile(r'([^= \n\r\t]+)[ \n\r\t]*(?:=[ \n\r\t]*(?:"([^"]*)"|\'([^\']*)\'|([^"\'][^ \n\r\t>]*)))?', re.S) - -def _parse_attrs(text): - attrs = {} - for match in _attr_re.finditer(text): - attr_name = match.group(1).lower() - attr_body = match.group(2) or match.group(3) - attr_body = html_unquote(attr_body or '') - # python <= 2.5 doesn't like **dict when the keys are unicode - # so cast str on them. Unicode field attributes are not - # supported now (actually they have never been supported). - attrs[str(attr_name)] = attr_body - return attrs - -class Field(object): - - """ - Field object. - """ - - # Dictionary of field types (select, radio, etc) to classes - classes = {} - - settable = True - - def __init__(self, form, tag, name, pos, - value=None, id=None, **attrs): - self.form = form - self.tag = tag - self.name = name - self.pos = pos - self._value = value - self.id = id - self.attrs = attrs - - def value__set(self, value): - if not self.settable: - raise AttributeError( - "You cannot set the value of the <%s> field %r" - % (self.tag, self.name)) - self._value = value - - def force_value(self, value): - """ - Like setting a value, except forces it even for, say, hidden - fields. - """ - self._value = value - - def value__get(self): - return self._value - - value = property(value__get, value__set) - -class NoValue(object): - pass - -class Select(Field): - - """ - Field representing ``<select>`` - """ - - def __init__(self, *args, **attrs): - super(Select, self).__init__(*args, **attrs) - self.options = [] - # Undetermined yet: - self.selectedIndex = None - # we have no forced value - self._forced_value = NoValue - - def force_value(self, value): - self._forced_value = value - - def value__set(self, value): - if self._forced_value is not NoValue: - self._forced_value = NoValue - for i, (option, checked) in enumerate(self.options): - if option == _stringify(value): - self.selectedIndex = i - break - else: - raise ValueError( - "Option %r not found (from %s)" - % (value, ', '.join( - [repr(o) for o, c in self.options]))) - - def value__get(self): - if self._forced_value is not NoValue: - return self._forced_value - elif self.selectedIndex is not None: - return self.options[self.selectedIndex][0] - else: - for option, checked in self.options: - if checked: - return option - else: - if self.options: - return self.options[0][0] - else: - return None - - value = property(value__get, value__set) - -Field.classes['select'] = Select - -class MultipleSelect(Field): - - """ - Field representing ``<select multiple="multiple">`` - """ - - def __init__(self, *args, **attrs): - super(MultipleSelect, self).__init__(*args, **attrs) - self.options = [] - # Undetermined yet: - self.selectedIndices = [] - self._forced_values = [] - - def force_value(self, values): - self._forced_values = values - self.selectedIndices = [] - - def value__set(self, values): - str_values = [_stringify(value) for value in values] - self.selectedIndicies = [] - for i, (option, checked) in enumerate(self.options): - if option in str_values: - self.selectedIndices.append(i) - str_values.remove(option) - if str_values: - raise ValueError( - "Option(s) %r not found (from %s)" - % (', '.join(str_values), - ', '.join( - [repr(o) for o, c in self.options]))) - - def value__get(self): - selected_values = [] - if self.selectedIndices: - selected_values = [self.options[i][0] for i in self.selectedIndices] - elif not self._forced_values: - selected_values = [] - for option, checked in self.options: - if checked: - selected_values.append(option) - if self._forced_values: - selected_values += self._forced_values - - if self.options and (not selected_values): - selected_values = None - return selected_values - value = property(value__get, value__set) - -Field.classes['multiple_select'] = MultipleSelect - -class Radio(Select): - - """ - Field representing ``<input type="radio">`` - """ - - def value__get(self): - if self.selectedIndex is not None: - return self.options[self.selectedIndex][0] - else: - for option, checked in self.options: - if checked: - return option - else: - return None - - value = property(value__get, Select.value__set) - - -Field.classes['radio'] = Radio - -class Checkbox(Field): - - """ - Field representing ``<input type="checkbox">`` - """ - - def __init__(self, *args, **attrs): - super(Checkbox, self).__init__(*args, **attrs) - self.checked = 'checked' in attrs - - def value__set(self, value): - self.checked = not not value - - def value__get(self): - if self.checked: - if self._value is None: - return 'on' - else: - return self._value - else: - return None - - value = property(value__get, value__set) - -Field.classes['checkbox'] = Checkbox - -class Text(Field): - """ - Field representing ``<input type="text">`` - """ - - def value__get(self): - if self._value is None: - return '' - else: - return self._value - - value = property(value__get, Field.value__set) - -Field.classes['text'] = Text - - -class File(Field): - """ - Field representing ``<input type="file">`` - """ - - ## FIXME: This doesn't actually handle file uploads and enctype - def value__get(self): - if self._value is None: - return '' - else: - return self._value - - value = property(value__get, Field.value__set) - -Field.classes['file'] = File - -class Textarea(Text): - """ - Field representing ``<textarea>`` - """ - -Field.classes['textarea'] = Textarea - -class Hidden(Text): - """ - Field representing ``<input type="hidden">`` - """ - -Field.classes['hidden'] = Hidden - -class Submit(Field): - """ - Field representing ``<input type="submit">`` and ``<button>`` - """ - - settable = False - - def value__get(self): - return None - - value = property(value__get) - - def value_if_submitted(self): - return self._value - -Field.classes['submit'] = Submit - -Field.classes['button'] = Submit - -Field.classes['image'] = Submit - -class Form(object): - - """ - This object represents a form that has been found in a page. - This has a couple useful attributes: - - ``text``: - the full HTML of the form. - - ``action``: - the relative URI of the action. - - ``method``: - the method (e.g., ``'GET'``). - - ``id``: - the id, or None if not given. - - ``fields``: - a dictionary of fields, each value is a list of fields by - that name. ``<input type=\"radio\">`` and ``<select>`` are - both represented as single fields with multiple options. - """ - - # @@: This really should be using Mechanize/ClientForm or - # something... - - _tag_re = re.compile(r'<(/?)([a-z0-9_\-]*)([^>]*?)>', re.I) - - FieldClass = Field - - def __init__(self, response, text): - self.response = response - self.text = text - self._parse_fields() - self._parse_action() - - def _parse_fields(self): - in_select = None - in_textarea = None - fields = {} - for match in self._tag_re.finditer(self.text): - end = match.group(1) == '/' - tag = match.group(2).lower() - if tag not in ('input', 'select', 'option', 'textarea', - 'button'): - continue - if tag == 'select' and end: - assert in_select, ( - '%r without starting select' % match.group(0)) - in_select = None - continue - if tag == 'textarea' and end: - assert in_textarea, ( - "</textarea> with no <textarea> at %s" % match.start()) - in_textarea[0].value = html_unquote(self.text[in_textarea[1]:match.start()]) - in_textarea = None - continue - if end: - continue - attrs = _parse_attrs(match.group(3)) - if 'name' in attrs: - name = attrs.pop('name') - else: - name = None - if tag == 'option': - in_select.options.append((attrs.get('value'), - 'selected' in attrs)) - continue - if tag == 'input' and attrs.get('type') == 'radio': - field = fields.get(name) - if not field: - field = self.FieldClass.classes['radio'](self, tag, name, match.start(), **attrs) - fields.setdefault(name, []).append(field) - else: - field = field[0] - assert isinstance(field, self.FieldClass.classes['radio']) - field.options.append((attrs.get('value'), - 'checked' in attrs)) - continue - tag_type = tag - if tag == 'input': - tag_type = attrs.get('type', 'text').lower() - if tag_type == "select" and attrs.get("multiple"): - FieldClass = self.FieldClass.classes.get("multiple_select", self.FieldClass) - else: - FieldClass = self.FieldClass.classes.get(tag_type, self.FieldClass) - field = FieldClass(self, tag, name, match.start(), **attrs) - if tag == 'textarea': - assert not in_textarea, ( - "Nested textareas: %r and %r" - % (in_textarea, match.group(0))) - in_textarea = field, match.end() - elif tag == 'select': - assert not in_select, ( - "Nested selects: %r and %r" - % (in_select, match.group(0))) - in_select = field - fields.setdefault(name, []).append(field) - self.fields = fields - - def _parse_action(self): - self.action = None - for match in self._tag_re.finditer(self.text): - end = match.group(1) == '/' - tag = match.group(2).lower() - if tag != 'form': - continue - if end: - break - attrs = _parse_attrs(match.group(3)) - self.action = attrs.get('action', '') - self.method = attrs.get('method', 'GET') - self.id = attrs.get('id') - self.enctype = attrs.get('enctype', 'application/x-www-form-urlencoded') - else: - assert 0, "No </form> tag found" - assert self.action is not None, ( - "No <form> tag found") - - def __setitem__(self, name, value): - """ - Set the value of the named field. If there is 0 or multiple - fields by that name, it is an error. - - Setting the value of a ``<select>`` selects the given option - (and confirms it is an option). Setting radio fields does the - same. Checkboxes get boolean values. You cannot set hidden - fields or buttons. - - Use ``.set()`` if there is any ambiguity and you must provide - an index. - """ - fields = self.fields.get(name) - assert fields is not None, ( - "No field by the name %r found (fields: %s)" - % (name, ', '.join(map(repr, self.fields.keys())))) - assert len(fields) == 1, ( - "Multiple fields match %r: %s" - % (name, ', '.join(map(repr, fields)))) - fields[0].value = value - - def __getitem__(self, name): - """ - Get the named field object (ambiguity is an error). - """ - fields = self.fields.get(name) - assert fields is not None, ( - "No field by the name %r found" % name) - assert len(fields) == 1, ( - "Multiple fields match %r: %s" - % (name, ', '.join(map(repr, fields)))) - return fields[0] - - def set(self, name, value, index=None): - """ - Set the given name, using ``index`` to disambiguate. - """ - if index is None: - self[name] = value - else: - fields = self.fields.get(name) - assert fields is not None, ( - "No fields found matching %r" % name) - field = fields[index] - field.value = value - - def get(self, name, index=None, default=NoDefault): - """ - Get the named/indexed field object, or ``default`` if no field - is found. - """ - fields = self.fields.get(name) - if fields is None and default is not NoDefault: - return default - if index is None: - return self[name] - else: - fields = self.fields.get(name) - assert fields is not None, ( - "No fields found matching %r" % name) - field = fields[index] - return field - - def select(self, name, value, index=None): - """ - Like ``.set()``, except also confirms the target is a - ``<select>``. - """ - field = self.get(name, index=index) - assert isinstance(field, Select) - field.value = value - - def submit(self, name=None, index=None, **args): - """ - Submits the form. If ``name`` is given, then also select that - button (using ``index`` to disambiguate)``. - - Any extra keyword arguments are passed to the ``.get()`` or - ``.post()`` method. - - Returns a response object. - """ - fields = self.submit_fields(name, index=index) - uploads = self.upload_fields() - if uploads: - args["upload_files"] = uploads - if self.method != "GET": - args.setdefault("content_type", self.enctype) - return self.response.goto(self.action, method=self.method, - params=fields, **args) - - def upload_fields(self): - """ - Return a list of file field tuples of the form: - (field name, file name) - or - (field name, file name, file contents). - """ - uploads = [] - for name, fields in self.fields.items(): - for field in fields: - if isinstance(field, File) and field.value: - uploads.append([name] + list(field.value)) - return uploads - - def submit_fields(self, name=None, index=None): - """ - Return a list of ``[(name, value), ...]`` for the current - state of the form. - """ - submit = [] - if name is not None: - field = self.get(name, index=index) - submit.append((field.name, field.value_if_submitted())) - for name, fields in self.fields.items(): - if name is None: - continue - for field in fields: - value = field.value - if value is None: - continue - if isinstance(field, File): - # skip file uploads; they need to be accounted - # for differently - continue - if isinstance(value, list): - for item in value: - submit.append((name, item)) - else: - submit.append((name, value)) - return submit - -######################################## -## Utility functions -######################################## - -def _stringify(value): - if isinstance(value, unicode): - return value - return str(value) - -def _popget(d, key, default=None): - """ - Pop the key if found (else return default) - """ - if key in d: - return d.pop(key) - return default - -def _space_prefix(pref, full, sep=None, indent=None, include_sep=True): - """ - Anything shared by pref and full will be replaced with spaces - in full, and full returned. - """ - if sep is None: - sep = os.path.sep - pref = pref.split(sep) - full = full.split(sep) - padding = [] - while pref and full and pref[0] == full[0]: - if indent is None: - padding.append(' ' * (len(full[0]) + len(sep))) - else: - padding.append(' ' * indent) - full.pop(0) - pref.pop(0) - if padding: - if include_sep: - return ''.join(padding) + sep + sep.join(full) - else: - return ''.join(padding) + sep.join(full) - else: - return sep.join(full) - -def _make_pattern(pat): - if pat is None: - return None - if isinstance(pat, (str, unicode)): - pat = re.compile(pat) - if hasattr(pat, 'search'): - return pat.search - if callable(pat): - return pat - assert 0, ( - "Cannot make callable pattern object out of %r" % pat) - -def html_unquote(v): - """ - Unquote (some) entities in HTML. (incomplete) - """ - for ent, repl in [(' ', ' '), ('>', '>'), - ('<', '<'), ('"', '"'), - ('&', '&')]: - v = v.replace(ent, repl) - return v diff --git a/webtest/testapp.py b/webtest/testapp.py new file mode 100644 index 0000000..9aa9b3c --- /dev/null +++ b/webtest/testapp.py @@ -0,0 +1,1609 @@ +# (c) 2005 Ian Bicking and contributors; written for Paste (http://pythonpaste.org) +# Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php +""" +Routines for testing WSGI applications. + +Most interesting is TestApp +""" + +import sys +import random +import urllib +import warnings +import mimetypes +import time +import cgi +import os +import re +import fnmatch +from webtest.compat import urlparse +from webtest.compat import print_stderr +from webtest.compat import StringIO +from webtest.compat import SimpleCookie, CookieError +from webtest.compat import cookie_quote +from webob import Request, Response +from webtest import lint + +__all__ = ['TestApp', 'TestRequest'] + +def tempnam_no_warning(*args): + """ + An os.tempnam with the warning turned off, because sometimes + you just need to use this and don't care about the stupid + security warning. + """ + return os.tempnam(*args) + +class NoDefault(object): + pass + +try: + sorted +except NameError: + def sorted(l): + l = list(l) + l.sort() + return l + +class AppError(Exception): + pass + +class CaptureStdout(object): + + def __init__(self, actual): + self.captured = StringIO() + self.actual = actual + + def write(self, s): + self.captured.write(s) + self.actual.write(s) + + def flush(self): + self.actual.flush() + + def writelines(self, lines): + for item in lines: + self.write(item) + + def getvalue(self): + return self.captured.getvalue() + +class TestResponse(Response): + + """ + Instances of this class are return by ``TestApp`` + """ + + _forms_indexed = None + + + def forms__get(self): + """ + Returns a dictionary of ``Form`` objects. Indexes are both in + order (from zero) and by form id (if the form is given an id). + """ + if self._forms_indexed is None: + self._parse_forms() + return self._forms_indexed + + forms = property(forms__get, + doc=""" + A list of <form>s found on the page (instances of + ``Form``) + """) + + def form__get(self): + forms = self.forms + if not forms: + raise TypeError( + "You used response.form, but no forms exist") + if 1 in forms: + # There is more than one form + raise TypeError( + "You used response.form, but more than one form exists") + return forms[0] + + form = property(form__get, + doc=""" + Returns a single ``Form`` instance; it + is an error if there are multiple forms on the + page. + """) + + @property + def testbody(self): + if getattr(self, '_use_unicode', True) and self.charset: + return self.unicode_body + return self.body + + _tag_re = re.compile(r'<(/?)([:a-z0-9_\-]*)(.*?)>', re.S|re.I) + + def _parse_forms(self): + forms = self._forms_indexed = {} + form_texts = [] + started = None + for match in self._tag_re.finditer(self.testbody): + end = match.group(1) == '/' + tag = match.group(2).lower() + if tag != 'form': + continue + if end: + assert started, ( + "</form> unexpected at %s" % match.start()) + form_texts.append(self.testbody[started:match.end()]) + started = None + else: + assert not started, ( + "Nested form tags at %s" % match.start()) + started = match.start() + assert not started, ( + "Danging form: %r" % self.testbody[started:]) + for i, text in enumerate(form_texts): + form = Form(self, text) + forms[i] = form + if form.id: + forms[form.id] = form + + def follow(self, **kw): + """ + If this request is a redirect, follow that redirect. It + is an error if this is not a redirect response. Returns + another response object. + """ + assert self.status_int >= 300 and self.status_int < 400, ( + "You can only follow redirect responses (not %s)" + % self.status) + location = self.headers['location'] + type, rest = urllib.splittype(location) + host, path = urllib.splithost(rest) + # @@: We should test that it's not a remote redirect + return self.test_app.get(location, **kw) + + def click(self, description=None, linkid=None, href=None, + anchor=None, index=None, verbose=False, + extra_environ=None): + """ + Click the link as described. Each of ``description``, + ``linkid``, and ``url`` are *patterns*, meaning that they are + either strings (regular expressions), compiled regular + expressions (objects with a ``search`` method), or callables + returning true or false. + + All the given patterns are ANDed together: + + * ``description`` is a pattern that matches the contents of the + anchor (HTML and all -- everything between ``<a...>`` and + ``</a>``) + + * ``linkid`` is a pattern that matches the ``id`` attribute of + the anchor. It will receive the empty string if no id is + given. + + * ``href`` is a pattern that matches the ``href`` of the anchor; + the literal content of that attribute, not the fully qualified + attribute. + + * ``anchor`` is a pattern that matches the entire anchor, with + its contents. + + If more than one link matches, then the ``index`` link is + followed. If ``index`` is not given and more than one link + matches, or if no link matches, then ``IndexError`` will be + raised. + + If you give ``verbose`` then messages will be printed about + each link, and why it does or doesn't match. If you use + ``app.click(verbose=True)`` you'll see a list of all the + links. + + You can use multiple criteria to essentially assert multiple + aspects about the link, e.g., where the link's destination is. + """ + __tracebackhide__ = True + found_html, found_desc, found_attrs = self._find_element( + tag='a', href_attr='href', + href_extract=None, + content=description, + id=linkid, + href_pattern=href, + html_pattern=anchor, + index=index, verbose=verbose) + return self.goto(found_attrs['uri'], extra_environ=extra_environ) + + def clickbutton(self, description=None, buttonid=None, href=None, + button=None, index=None, verbose=False): + """ + Like ``.click()``, except looks for link-like buttons. + This kind of button should look like + ``<button onclick="...location.href='url'...">``. + """ + __tracebackhide__ = True + found_html, found_desc, found_attrs = self._find_element( + tag='button', href_attr='onclick', + href_extract=re.compile(r"location\.href='(.*?)'"), + content=description, + id=buttonid, + href_pattern=href, + html_pattern=button, + index=index, verbose=verbose) + return self.goto(found_attrs['uri']) + + def _find_element(self, tag, href_attr, href_extract, + content, id, + href_pattern, + html_pattern, + index, verbose): + content_pat = _make_pattern(content) + id_pat = _make_pattern(id) + href_pat = _make_pattern(href_pattern) + html_pat = _make_pattern(html_pattern) + + _tag_re = re.compile(r'<%s\s+(.*?)>(.*?)</%s>' % (tag, tag), + re.I+re.S) + _script_re = re.compile(r'<script.*?>.*?</script>', re.I|re.S) + bad_spans = [] + for match in _script_re.finditer(self.testbody): + bad_spans.append((match.start(), match.end())) + + def printlog(s): + if verbose: + print(s) + + found_links = [] + total_links = 0 + for match in _tag_re.finditer(self.testbody): + found_bad = False + for bad_start, bad_end in bad_spans: + if (match.start() > bad_start + and match.end() < bad_end): + found_bad = True + break + if found_bad: + continue + el_html = match.group(0) + el_attr = match.group(1) + el_content = match.group(2) + attrs = _parse_attrs(el_attr) + if verbose: + printlog('Element: %r' % el_html) + if not attrs.get(href_attr): + printlog(' Skipped: no %s attribute' % href_attr) + continue + el_href = attrs[href_attr] + if href_extract: + m = href_extract.search(el_href) + if not m: + printlog(" Skipped: doesn't match extract pattern") + continue + el_href = m.group(1) + attrs['uri'] = el_href + if el_href.startswith('#'): + printlog(' Skipped: only internal fragment href') + continue + if el_href.startswith('javascript:'): + printlog(' Skipped: cannot follow javascript:') + continue + total_links += 1 + if content_pat and not content_pat(el_content): + printlog(" Skipped: doesn't match description") + continue + if id_pat and not id_pat(attrs.get('id', '')): + printlog(" Skipped: doesn't match id") + continue + if href_pat and not href_pat(el_href): + printlog(" Skipped: doesn't match href") + continue + if html_pat and not html_pat(el_html): + printlog(" Skipped: doesn't match html") + continue + printlog(" Accepted") + found_links.append((el_html, el_content, attrs)) + if not found_links: + raise IndexError( + "No matching elements found (from %s possible)" + % total_links) + if index is None: + if len(found_links) > 1: + raise IndexError( + "Multiple links match: %s" + % ', '.join([repr(anc) for anc, d, attr in found_links])) + found_link = found_links[0] + else: + try: + found_link = found_links[index] + except IndexError: + raise IndexError( + "Only %s (out of %s) links match; index %s out of range" + % (len(found_links), total_links, index)) + return found_link + + def goto(self, href, method='get', **args): + """ + Go to the (potentially relative) link ``href``, using the + given method (``'get'`` or ``'post'``) and any extra arguments + you want to pass to the ``app.get()`` or ``app.post()`` + methods. + + All hostnames and schemes will be ignored. + """ + scheme, host, path, query, fragment = urlparse.urlsplit(href) + # We + scheme = host = fragment = '' + href = urlparse.urlunsplit((scheme, host, path, query, fragment)) + href = urlparse.urljoin(self.request.url, href) + method = method.lower() + assert method in ('get', 'post'), ( + 'Only "get" or "post" are allowed for method (you gave %r)' + % method) + + # encode unicode strings for the outside world + if getattr(self, '_use_unicode', False): + def to_str(s): + if isinstance(s, unicode): + return s.encode(self.charset) + return s + + href = to_str(href) + + if 'params' in args: + args['params'] = [tuple(map(to_str, p)) for p in args['params']] + + if 'upload_files' in args: + args['upload_files'] = [map(to_str, f) for f in args['upload_files']] + + if 'content_type' in args: + args['content_type'] = to_str(args['content_type']) + + if method == 'get': + method = self.test_app.get + else: + method = self.test_app.post + return method(href, **args) + + _normal_body_regex = re.compile(r'[ \n\r\t]+') + + _normal_body = None + + def normal_body__get(self): + if self._normal_body is None: + self._normal_body = self._normal_body_regex.sub( + ' ', self.body) + return self._normal_body + + normal_body = property(normal_body__get, + doc=""" + Return the whitespace-normalized body + """.strip()) + + def unicode_normal_body__get(self): + if not self.charset: + raise AttributeError( + "You cannot access Response.unicode_normal_body unless charset is set") + return self.normal_body.decode(self.charset) + + unicode_normal_body = property( + unicode_normal_body__get, doc=""" + Return the whitespace-normalized body, as unicode + """.strip()) + + def __contains__(self, s): + """ + A response 'contains' a string if it is present in the body + of the response. Whitespace is normalized when searching + for a string. + """ + if not isinstance(s, basestring): + if hasattr(s, '__unicode__'): + s = unicode(s) + else: + s = str(s) + if isinstance(s, unicode): + body = self.unicode_body + normal_body = self.unicode_normal_body + else: + body = self.body + normal_body = self.normal_body + return s in body or s in normal_body + + def mustcontain(self, *strings, **kw): + """ + Assert that the response contains all of the strings passed + in as arguments. + + Equivalent to:: + + assert string in res + """ + if 'no' in kw: + no = kw['no'] + del kw['no'] + if isinstance(no, basestring): + no = [no] + else: + no = [] + if kw: + raise TypeError( + "The only keyword argument allowed is 'no'") + for s in strings: + if not s in self: + print_stderr("Actual response (no %r):" % s) + print_stderr(self) + raise IndexError( + "Body does not contain string %r" % s) + for no_s in no: + if no_s in self: + print_stderr("Actual response (has %r)" % no_s) + print_stderr(self) + raise IndexError( + "Body contains bad string %r" % no_s) + + def __str__(self): + simple_body = '\n'.join([l for l in self.body.splitlines() + if l.strip()]) + headers = [(self._normalize_header_name(n), v) + for n, v in self.headerlist + if n.lower() != 'content-length'] + headers.sort() + return 'Response: %s\n%s\n%s' % ( + self.status, + '\n'.join(['%s: %s' % (n, v) for n, v in headers]), + simple_body) + + def _normalize_header_name(self, name): + name = name.replace('-', ' ').title().replace(' ', '-') + return name + + def __repr__(self): + # Specifically intended for doctests + if self.content_type: + ct = ' %s' % self.content_type + else: + ct = '' + if self.body: + br = repr(self.body) + if len(br) > 18: + br = br[:10]+'...'+br[-5:] + br += '/%s' % len(self.body) + body = ' body=%s' % br + else: + body = ' no body' + if self.location: + location = ' location: %s' % self.location + else: + location = '' + return ('<' + self.status + ct + location + body + '>') + + def html(self): + """ + Returns the response as a `BeautifulSoup + <http://www.crummy.com/software/BeautifulSoup/documentation.html>`_ + object. + + Only works with HTML responses; other content-types raise + AttributeError. + """ + if 'html' not in self.content_type: + raise AttributeError( + "Not an HTML response body (content-type: %s)" + % self.content_type) + try: + from BeautifulSoup import BeautifulSoup + except ImportError: + raise ImportError( + "You must have BeautifulSoup installed to use response.html") + soup = BeautifulSoup(self.testbody) + return soup + + html = property(html, doc=html.__doc__) + + def xml(self): + """ + Returns the response as an `ElementTree + <http://python.org/doc/current/lib/module-xml.etree.ElementTree.html>`_ + object. + + Only works with XML responses; other content-types raise + AttributeError + """ + if 'xml' not in self.content_type: + raise AttributeError( + "Not an XML response body (content-type: %s)" + % self.content_type) + try: + from xml.etree import ElementTree + except ImportError: + try: + import ElementTree + except ImportError: + try: + from elementtree import ElementTree + except ImportError: + raise ImportError( + "You must have ElementTree installed (or use Python 2.5) to use response.xml") + # ElementTree can't parse unicode => use `body` instead of `testbody` + return ElementTree.XML(self.body) + + xml = property(xml, doc=xml.__doc__) + + def lxml(self): + """ + Returns the response as an `lxml object + <http://codespeak.net/lxml/>`_. You must have lxml installed + to use this. + + If this is an HTML response and you have lxml 2.x installed, + then an ``lxml.html.HTML`` object will be returned; if you + have an earlier version of lxml then a ``lxml.HTML`` object + will be returned. + """ + if ('html' not in self.content_type + and 'xml' not in self.content_type): + raise AttributeError( + "Not an XML or HTML response body (content-type: %s)" + % self.content_type) + try: + from lxml import etree + except ImportError: + raise ImportError( + "You must have lxml installed to use response.lxml") + try: + from lxml.html import fromstring + except ImportError: + fromstring = etree.HTML + ## FIXME: would be nice to set xml:base, in some fashion + if self.content_type == 'text/html': + return fromstring(self.testbody, base_url=self.request.url) + else: + return etree.XML(self.testbody, base_url=self.request.url) + + lxml = property(lxml, doc=lxml.__doc__) + + def json(self): + """ + Return the response as a JSON response. You must have + `simplejson + <http://svn.red-bean.com/bob/simplejson/tags/simplejson-1.7/docs/index.html>`_ + installed to use this, or be using a Python version with the + json module. + + The content type must be application/json to use this. + """ + if self.content_type != 'application/json': + raise AttributeError( + "Not a JSON response body (content-type: %s)" + % self.content_type) + try: + from simplejson import loads + except ImportError: + try: + from json import loads + except ImportError: + raise ImportError( + "You must have simplejson installed to use response.json") + return loads(self.testbody) + + json = property(json, doc=json.__doc__) + + def pyquery(self): + """ + Returns the response as a `PyQuery <http://pyquery.org/>`_ object. + + Only works with HTML and XML responses; other content-types raise + AttributeError. + """ + if 'html' not in self.content_type and 'xml' not in self.content_type: + raise AttributeError( + "Not an HTML or XML response body (content-type: %s)" + % self.content_type) + try: + from pyquery import PyQuery + except ImportError: + raise ImportError( + "You must have PyQuery installed to use response.pyquery") + d = PyQuery(self.testbody) + return d + + pyquery = property(pyquery, doc=pyquery.__doc__) + + def showbrowser(self): + """ + Show this response in a browser window (for debugging purposes, + when it's hard to read the HTML). + """ + import webbrowser + fn = tempnam_no_warning(None, 'webtest-page') + '.html' + f = open(fn, 'wb') + f.write(self.body) + f.close() + url = 'file:' + fn.replace(os.sep, '/') + webbrowser.open_new(url) + +class TestRequest(Request): + + # for py.test + disabled = True + ResponseClass = TestResponse + +class TestApp(object): + """ + Wraps a WSGI application in a more convenient interface for + testing. + + ``app`` may be an application, or a Paste Deploy app + URI, like ``'config:filename.ini#test'``. + + ``extra_environ`` is a dictionary of values that should go + into the environment for each request. These can provide a + communication channel with the application. + + ``relative_to`` is a directory, and filenames used for file + uploads are calculated relative to this. Also ``config:`` + URIs that aren't absolute. + """ + + # for py.test + disabled = True + RequestClass = TestRequest + + def __init__(self, app, extra_environ=None, relative_to=None, use_unicode=True): + if isinstance(app, (str, unicode)): + from paste.deploy import loadapp + # @@: Should pick up relative_to from calling module's + # __file__ + app = loadapp(app, relative_to=relative_to) + self.app = app + self.relative_to = relative_to + if extra_environ is None: + extra_environ = {} + self.extra_environ = extra_environ + self.use_unicode = use_unicode + self.reset() + + def reset(self): + """ + Resets the state of the application; currently just clears + saved cookies. + """ + self.cookies = {} + + def _make_environ(self, extra_environ=None): + environ = self.extra_environ.copy() + environ['paste.throw_errors'] = True + if extra_environ: + environ.update(extra_environ) + return environ + + def _remove_fragment(self, url): + scheme, netloc, path, query, fragment = urlparse.urlsplit(url) + return urlparse.urlunsplit((scheme, netloc, path, query, "")) + + def get(self, url, params=None, headers=None, extra_environ=None, + status=None, expect_errors=False): + """ + Get the given url (well, actually a path like + ``'/page.html'``). + + ``params``: + A query string, or a dictionary that will be encoded + into a query string. You may also include a query + string on the ``url``. + + ``headers``: + A dictionary of extra headers to send. + + ``extra_environ``: + A dictionary of environmental variables that should + be added to the request. + + ``status``: + The integer status code you expect (if not 200 or 3xx). + If you expect a 404 response, for instance, you must give + ``status=404`` or it will be an error. You can also give + a wildcard, like ``'3*'`` or ``'*'``. + + ``expect_errors``: + If this is not true, then if anything is written to + ``wsgi.errors`` it will be an error. If it is true, then + non-200/3xx responses are also okay. + + Returns a :class:`webtest.TestResponse` object. + """ + environ = self._make_environ(extra_environ) + # Hide from py.test: + __tracebackhide__ = True + url = self._remove_fragment(url) + if params: + if not isinstance(params, (str, unicode)): + params = urllib.urlencode(params, doseq=True) + if '?' in url: + url += '&' + else: + url += '?' + url += params + url = str(url) + if '?' in url: + url, environ['QUERY_STRING'] = url.split('?', 1) + else: + environ['QUERY_STRING'] = '' + req = self.RequestClass.blank(url, environ) + if headers: + req.headers.update(headers) + return self.do_request(req, status=status, + expect_errors=expect_errors) + + def _gen_request(self, method, url, params='', headers=None, extra_environ=None, + status=None, upload_files=None, expect_errors=False, + content_type=None): + """ + Do a generic request. + """ + environ = self._make_environ(extra_environ) + # @@: Should this be all non-strings? + if isinstance(params, (list, tuple, dict)): + params = urllib.urlencode(params, doseq=True) + if hasattr(params, 'items'): + params = urllib.urlencode(params.items(), doseq=True) + if upload_files or (content_type and content_type.startswith('multipart')): + params = cgi.parse_qsl(params, keep_blank_values=True) + content_type, params = self.encode_multipart( + params, upload_files or ()) + environ['CONTENT_TYPE'] = content_type + elif params: + environ.setdefault('CONTENT_TYPE', 'application/x-www-form-urlencoded') + if '?' in url: + url, environ['QUERY_STRING'] = url.split('?', 1) + else: + environ['QUERY_STRING'] = '' + if content_type is not None: + environ['CONTENT_TYPE'] = content_type + environ['CONTENT_LENGTH'] = str(len(params)) + environ['REQUEST_METHOD'] = method + environ['wsgi.input'] = StringIO(params) + url = self._remove_fragment(url) + req = self.RequestClass.blank(url, environ) + if headers: + req.headers.update(headers) + return self.do_request(req, status=status, + expect_errors=expect_errors) + + def post(self, url, params='', headers=None, extra_environ=None, + status=None, upload_files=None, expect_errors=False, + content_type=None): + """ + Do a POST request. Very like the ``.get()`` method. + ``params`` are put in the body of the request. + + ``upload_files`` is for file uploads. It should be a list of + ``[(fieldname, filename, file_content)]``. You can also use + just ``[(fieldname, filename)]`` and the file content will be + read from disk. + + Returns a ``webob.Response`` object. + """ + return self._gen_request('POST', url, params=params, headers=headers, + extra_environ=extra_environ,status=status, + upload_files=upload_files, + expect_errors=expect_errors, + content_type=content_type) + + def put(self, url, params='', headers=None, extra_environ=None, + status=None, upload_files=None, expect_errors=False, + content_type=None): + """ + Do a PUT request. Very like the ``.put()`` method. + ``params`` are put in the body of the request, if params is a + tuple, dictionary, list, or iterator it will be urlencoded and + placed in the body as with a POST, if it is string it will not + be encoded, but placed in the body directly. + + Returns a ``webob.Response`` object. + """ + return self._gen_request('PUT', url, params=params, headers=headers, + extra_environ=extra_environ,status=status, + upload_files=upload_files, + expect_errors=expect_errors, + content_type=content_type) + + def delete(self, url, params='', headers=None, extra_environ=None, + status=None, expect_errors=False): + """ + Do a DELETE request. Very like the ``.get()`` method. + + Returns a ``webob.Response`` object. + """ + if params: + warnings.warn(('You are not supposed to send a body in a ' + 'DELETE request. Most web servers will ignore it'), + lint.WSGIWarning) + return self._gen_request('DELETE', url, params=params, headers=headers, + extra_environ=extra_environ,status=status, + upload_files=None, expect_errors=expect_errors) + + def head(self, url, headers=None, extra_environ=None, + status=None, expect_errors=False): + """ + Do a HEAD request. Very like the ``.get()`` method. + + Returns a ``webob.Response`` object. + """ + return self._gen_request('HEAD', url, headers=headers, + extra_environ=extra_environ,status=status, + upload_files=None, expect_errors=expect_errors) + + def encode_multipart(self, params, files): + """ + Encodes a set of parameters (typically a name/value list) and + a set of files (a list of (name, filename, file_body)) into a + typical POST body, returning the (content_type, body). + """ + boundary = '----------a_BoUnDaRy%s$' % random.random() + lines = [] + for key, value in params: + lines.append('--'+boundary) + lines.append('Content-Disposition: form-data; name="%s"' % key) + lines.append('') + lines.append(value) + for file_info in files: + key, filename, value = self._get_file_info(file_info) + lines.append('--'+boundary) + lines.append('Content-Disposition: form-data; name="%s"; filename="%s"' + % (key, filename)) + fcontent = mimetypes.guess_type(filename)[0] + lines.append('Content-Type: %s' % + (fcontent or 'application/octet-stream')) + lines.append('') + lines.append(value) + lines.append('--' + boundary + '--') + lines.append('') + body = '\r\n'.join(lines) + content_type = 'multipart/form-data; boundary=%s' % boundary + return content_type, body + + def _get_file_info(self, file_info): + if len(file_info) == 2: + # It only has a filename + filename = file_info[1] + if self.relative_to: + filename = os.path.join(self.relative_to, filename) + f = open(filename, 'rb') + content = f.read() + f.close() + return (file_info[0], filename, content) + elif len(file_info) == 3: + return file_info + else: + raise ValueError( + "upload_files need to be a list of tuples of (fieldname, " + "filename, filecontent) or (fieldname, filename); " + "you gave: %r" + % repr(file_info)[:100]) + + + def request(self, url_or_req, status=None, expect_errors=False, + **req_params): + """ + Creates and executes a request. You may either pass in an + instantiated :class:`TestRequest` object, or you may pass in a + URL and keyword arguments to be passed to + :meth:`TestRequest.blank`. + + You can use this to run a request without the intermediary + functioning of :meth:`TestApp.get` etc. For instance, to + test a WebDAV method:: + + resp = app.request('/new-col', method='MKCOL') + + Note that the request won't have a body unless you specify it, + like:: + + resp = app.request('/test.txt', method='PUT', body='test') + + You can use ``POST={args}`` to set the request body to the + serialized arguments, and simultaneously set the request + method to ``POST`` + """ + if isinstance(url_or_req, basestring): + req = self.RequestClass.blank(url_or_req, **req_params) + else: + req = url_or_req.copy() + for name, value in req_params.iteritems(): + setattr(req, name, value) + if req.content_length == -1: + req.content_length = len(req.body) + req.environ['paste.throw_errors'] = True + for name, value in self.extra_environ.iteritems(): + req.environ.setdefault(name, value) + return self.do_request(req, status=status, expect_errors=expect_errors) + + def do_request(self, req, status, expect_errors): + """ + Executes the given request (``req``), with the expected + ``status``. Generally ``.get()`` and ``.post()`` are used + instead. + + To use this:: + + resp = app.do_request(webtest.TestRequest.blank( + 'url', ...args...)) + + Note you can pass any keyword arguments to + ``TestRequest.blank()``, which will be set on the request. + These can be arguments like ``content_type``, ``accept``, etc. + """ + __tracebackhide__ = True + errors = StringIO() + req.environ['wsgi.errors'] = errors + script_name = req.environ.get('SCRIPT_NAME', '') + if script_name and req.path_info.startswith(script_name): + req.path_info = req.path_info[len(script_name):] + if self.cookies: + cookie_header = ''.join([ + '%s=%s; ' % (name, cookie_quote(value)) + for name, value in self.cookies.items()]) + req.environ['HTTP_COOKIE'] = cookie_header + req.environ['paste.testing'] = True + req.environ['paste.testing_variables'] = {} + app = lint.middleware(self.app) + old_stdout = sys.stdout + out = CaptureStdout(old_stdout) + try: + sys.stdout = out + start_time = time.time() + ## FIXME: should it be an option to not catch exc_info? + res = req.get_response(app, catch_exc_info=True) + res._use_unicode = self.use_unicode + end_time = time.time() + finally: + sys.stdout = old_stdout + res.app = app + res.test_app = self + # We do this to make sure the app_iter is exausted: + res.body + res.errors = errors.getvalue() + total_time = end_time - start_time + for name, value in req.environ['paste.testing_variables'].items(): + if hasattr(res, name): + raise ValueError( + "paste.testing_variables contains the variable %r, but " + "the response object already has an attribute by that " + "name" % name) + setattr(res, name, value) + if not expect_errors: + self._check_status(status, res) + self._check_errors(res) + res.cookies_set = {} + for header in res.headers.getall('set-cookie'): + try: + c = SimpleCookie(header) + except CookieError: + raise CookieError( + "Could not parse cookie header %r" % (header,)) + for key, morsel in c.items(): + self.cookies[key] = morsel.value + res.cookies_set[key] = morsel.value + return res + + def _check_status(self, status, res): + __tracebackhide__ = True + if status == '*': + return + if (isinstance(status, basestring) + and '*' in status): + if re.match(fnmatch.translate(status), res.status, re.I): + return + if isinstance(status, (list, tuple)): + if res.status_int not in status: + raise AppError( + "Bad response: %s (not one of %s for %s)\n%s" + % (res.status, ', '.join(map(str, status)), + res.request.url, res.body)) + return + if status is None: + if res.status_int >= 200 and res.status_int < 400: + return + raise AppError( + "Bad response: %s (not 200 OK or 3xx redirect for %s)\n%s" + % (res.status, res.request.url, + res.body)) + if status != res.status_int: + raise AppError( + "Bad response: %s (not %s)" % (res.status, status)) + + def _check_errors(self, res): + errors = res.errors + if errors: + raise AppError( + "Application had errors logged:\n%s" % errors) + + +######################################## +## Form objects +######################################## + + +_attr_re = re.compile(r'([^= \n\r\t]+)[ \n\r\t]*(?:=[ \n\r\t]*(?:"([^"]*)"|\'([^\']*)\'|([^"\'][^ \n\r\t>]*)))?', re.S) + +def _parse_attrs(text): + attrs = {} + for match in _attr_re.finditer(text): + attr_name = match.group(1).lower() + attr_body = match.group(2) or match.group(3) + attr_body = html_unquote(attr_body or '') + # python <= 2.5 doesn't like **dict when the keys are unicode + # so cast str on them. Unicode field attributes are not + # supported now (actually they have never been supported). + attrs[str(attr_name)] = attr_body + return attrs + +class Field(object): + + """ + Field object. + """ + + # Dictionary of field types (select, radio, etc) to classes + classes = {} + + settable = True + + def __init__(self, form, tag, name, pos, + value=None, id=None, **attrs): + self.form = form + self.tag = tag + self.name = name + self.pos = pos + self._value = value + self.id = id + self.attrs = attrs + + def value__set(self, value): + if not self.settable: + raise AttributeError( + "You cannot set the value of the <%s> field %r" + % (self.tag, self.name)) + self._value = value + + def force_value(self, value): + """ + Like setting a value, except forces it even for, say, hidden + fields. + """ + self._value = value + + def value__get(self): + return self._value + + value = property(value__get, value__set) + +class NoValue(object): + pass + +class Select(Field): + + """ + Field representing ``<select>`` + """ + + def __init__(self, *args, **attrs): + super(Select, self).__init__(*args, **attrs) + self.options = [] + # Undetermined yet: + self.selectedIndex = None + # we have no forced value + self._forced_value = NoValue + + def force_value(self, value): + self._forced_value = value + + def value__set(self, value): + if self._forced_value is not NoValue: + self._forced_value = NoValue + for i, (option, checked) in enumerate(self.options): + if option == _stringify(value): + self.selectedIndex = i + break + else: + raise ValueError( + "Option %r not found (from %s)" + % (value, ', '.join( + [repr(o) for o, c in self.options]))) + + def value__get(self): + if self._forced_value is not NoValue: + return self._forced_value + elif self.selectedIndex is not None: + return self.options[self.selectedIndex][0] + else: + for option, checked in self.options: + if checked: + return option + else: + if self.options: + return self.options[0][0] + else: + return None + + value = property(value__get, value__set) + +Field.classes['select'] = Select + +class MultipleSelect(Field): + + """ + Field representing ``<select multiple="multiple">`` + """ + + def __init__(self, *args, **attrs): + super(MultipleSelect, self).__init__(*args, **attrs) + self.options = [] + # Undetermined yet: + self.selectedIndices = [] + self._forced_values = [] + + def force_value(self, values): + self._forced_values = values + self.selectedIndices = [] + + def value__set(self, values): + str_values = [_stringify(value) for value in values] + self.selectedIndicies = [] + for i, (option, checked) in enumerate(self.options): + if option in str_values: + self.selectedIndices.append(i) + str_values.remove(option) + if str_values: + raise ValueError( + "Option(s) %r not found (from %s)" + % (', '.join(str_values), + ', '.join( + [repr(o) for o, c in self.options]))) + + def value__get(self): + selected_values = [] + if self.selectedIndices: + selected_values = [self.options[i][0] for i in self.selectedIndices] + elif not self._forced_values: + selected_values = [] + for option, checked in self.options: + if checked: + selected_values.append(option) + if self._forced_values: + selected_values += self._forced_values + + if self.options and (not selected_values): + selected_values = None + return selected_values + value = property(value__get, value__set) + +Field.classes['multiple_select'] = MultipleSelect + +class Radio(Select): + + """ + Field representing ``<input type="radio">`` + """ + + def value__get(self): + if self.selectedIndex is not None: + return self.options[self.selectedIndex][0] + else: + for option, checked in self.options: + if checked: + return option + else: + return None + + value = property(value__get, Select.value__set) + + +Field.classes['radio'] = Radio + +class Checkbox(Field): + + """ + Field representing ``<input type="checkbox">`` + """ + + def __init__(self, *args, **attrs): + super(Checkbox, self).__init__(*args, **attrs) + self.checked = 'checked' in attrs + + def value__set(self, value): + self.checked = not not value + + def value__get(self): + if self.checked: + if self._value is None: + return 'on' + else: + return self._value + else: + return None + + value = property(value__get, value__set) + +Field.classes['checkbox'] = Checkbox + +class Text(Field): + """ + Field representing ``<input type="text">`` + """ + + def value__get(self): + if self._value is None: + return '' + else: + return self._value + + value = property(value__get, Field.value__set) + +Field.classes['text'] = Text + + +class File(Field): + """ + Field representing ``<input type="file">`` + """ + + ## FIXME: This doesn't actually handle file uploads and enctype + def value__get(self): + if self._value is None: + return '' + else: + return self._value + + value = property(value__get, Field.value__set) + +Field.classes['file'] = File + +class Textarea(Text): + """ + Field representing ``<textarea>`` + """ + +Field.classes['textarea'] = Textarea + +class Hidden(Text): + """ + Field representing ``<input type="hidden">`` + """ + +Field.classes['hidden'] = Hidden + +class Submit(Field): + """ + Field representing ``<input type="submit">`` and ``<button>`` + """ + + settable = False + + def value__get(self): + return None + + value = property(value__get) + + def value_if_submitted(self): + return self._value + +Field.classes['submit'] = Submit + +Field.classes['button'] = Submit + +Field.classes['image'] = Submit + +class Form(object): + + """ + This object represents a form that has been found in a page. + This has a couple useful attributes: + + ``text``: + the full HTML of the form. + + ``action``: + the relative URI of the action. + + ``method``: + the method (e.g., ``'GET'``). + + ``id``: + the id, or None if not given. + + ``fields``: + a dictionary of fields, each value is a list of fields by + that name. ``<input type=\"radio\">`` and ``<select>`` are + both represented as single fields with multiple options. + """ + + # @@: This really should be using Mechanize/ClientForm or + # something... + + _tag_re = re.compile(r'<(/?)([a-z0-9_\-]*)([^>]*?)>', re.I) + + FieldClass = Field + + def __init__(self, response, text): + self.response = response + self.text = text + self._parse_fields() + self._parse_action() + + def _parse_fields(self): + in_select = None + in_textarea = None + fields = {} + for match in self._tag_re.finditer(self.text): + end = match.group(1) == '/' + tag = match.group(2).lower() + if tag not in ('input', 'select', 'option', 'textarea', + 'button'): + continue + if tag == 'select' and end: + assert in_select, ( + '%r without starting select' % match.group(0)) + in_select = None + continue + if tag == 'textarea' and end: + assert in_textarea, ( + "</textarea> with no <textarea> at %s" % match.start()) + in_textarea[0].value = html_unquote(self.text[in_textarea[1]:match.start()]) + in_textarea = None + continue + if end: + continue + attrs = _parse_attrs(match.group(3)) + if 'name' in attrs: + name = attrs.pop('name') + else: + name = None + if tag == 'option': + in_select.options.append((attrs.get('value'), + 'selected' in attrs)) + continue + if tag == 'input' and attrs.get('type') == 'radio': + field = fields.get(name) + if not field: + field = self.FieldClass.classes['radio'](self, tag, name, match.start(), **attrs) + fields.setdefault(name, []).append(field) + else: + field = field[0] + assert isinstance(field, self.FieldClass.classes['radio']) + field.options.append((attrs.get('value'), + 'checked' in attrs)) + continue + tag_type = tag + if tag == 'input': + tag_type = attrs.get('type', 'text').lower() + if tag_type == "select" and attrs.get("multiple"): + FieldClass = self.FieldClass.classes.get("multiple_select", self.FieldClass) + else: + FieldClass = self.FieldClass.classes.get(tag_type, self.FieldClass) + field = FieldClass(self, tag, name, match.start(), **attrs) + if tag == 'textarea': + assert not in_textarea, ( + "Nested textareas: %r and %r" + % (in_textarea, match.group(0))) + in_textarea = field, match.end() + elif tag == 'select': + assert not in_select, ( + "Nested selects: %r and %r" + % (in_select, match.group(0))) + in_select = field + fields.setdefault(name, []).append(field) + self.fields = fields + + def _parse_action(self): + self.action = None + for match in self._tag_re.finditer(self.text): + end = match.group(1) == '/' + tag = match.group(2).lower() + if tag != 'form': + continue + if end: + break + attrs = _parse_attrs(match.group(3)) + self.action = attrs.get('action', '') + self.method = attrs.get('method', 'GET') + self.id = attrs.get('id') + self.enctype = attrs.get('enctype', 'application/x-www-form-urlencoded') + else: + assert 0, "No </form> tag found" + assert self.action is not None, ( + "No <form> tag found") + + def __setitem__(self, name, value): + """ + Set the value of the named field. If there is 0 or multiple + fields by that name, it is an error. + + Setting the value of a ``<select>`` selects the given option + (and confirms it is an option). Setting radio fields does the + same. Checkboxes get boolean values. You cannot set hidden + fields or buttons. + + Use ``.set()`` if there is any ambiguity and you must provide + an index. + """ + fields = self.fields.get(name) + assert fields is not None, ( + "No field by the name %r found (fields: %s)" + % (name, ', '.join(map(repr, self.fields.keys())))) + assert len(fields) == 1, ( + "Multiple fields match %r: %s" + % (name, ', '.join(map(repr, fields)))) + fields[0].value = value + + def __getitem__(self, name): + """ + Get the named field object (ambiguity is an error). + """ + fields = self.fields.get(name) + assert fields is not None, ( + "No field by the name %r found" % name) + assert len(fields) == 1, ( + "Multiple fields match %r: %s" + % (name, ', '.join(map(repr, fields)))) + return fields[0] + + def set(self, name, value, index=None): + """ + Set the given name, using ``index`` to disambiguate. + """ + if index is None: + self[name] = value + else: + fields = self.fields.get(name) + assert fields is not None, ( + "No fields found matching %r" % name) + field = fields[index] + field.value = value + + def get(self, name, index=None, default=NoDefault): + """ + Get the named/indexed field object, or ``default`` if no field + is found. + """ + fields = self.fields.get(name) + if fields is None and default is not NoDefault: + return default + if index is None: + return self[name] + else: + fields = self.fields.get(name) + assert fields is not None, ( + "No fields found matching %r" % name) + field = fields[index] + return field + + def select(self, name, value, index=None): + """ + Like ``.set()``, except also confirms the target is a + ``<select>``. + """ + field = self.get(name, index=index) + assert isinstance(field, Select) + field.value = value + + def submit(self, name=None, index=None, **args): + """ + Submits the form. If ``name`` is given, then also select that + button (using ``index`` to disambiguate)``. + + Any extra keyword arguments are passed to the ``.get()`` or + ``.post()`` method. + + Returns a :class:`webtest.TestResponse` object. + """ + fields = self.submit_fields(name, index=index) + uploads = self.upload_fields() + if uploads: + args["upload_files"] = uploads + if self.method != "GET": + args.setdefault("content_type", self.enctype) + return self.response.goto(self.action, method=self.method, + params=fields, **args) + + def upload_fields(self): + """ + Return a list of file field tuples of the form: + (field name, file name) + or + (field name, file name, file contents). + """ + uploads = [] + for name, fields in self.fields.items(): + for field in fields: + if isinstance(field, File) and field.value: + uploads.append([name] + list(field.value)) + return uploads + + def submit_fields(self, name=None, index=None): + """ + Return a list of ``[(name, value), ...]`` for the current + state of the form. + """ + submit = [] + if name is not None: + field = self.get(name, index=index) + submit.append((field.name, field.value_if_submitted())) + for name, fields in self.fields.items(): + if name is None: + continue + for field in fields: + value = field.value + if value is None: + continue + if isinstance(field, File): + # skip file uploads; they need to be accounted + # for differently + continue + if isinstance(value, list): + for item in value: + submit.append((name, item)) + else: + submit.append((name, value)) + return submit + +######################################## +## Utility functions +######################################## + +def _stringify(value): + if isinstance(value, unicode): + return value + return str(value) + +def _popget(d, key, default=None): + """ + Pop the key if found (else return default) + """ + if key in d: + return d.pop(key) + return default + +def _space_prefix(pref, full, sep=None, indent=None, include_sep=True): + """ + Anything shared by pref and full will be replaced with spaces + in full, and full returned. + """ + if sep is None: + sep = os.path.sep + pref = pref.split(sep) + full = full.split(sep) + padding = [] + while pref and full and pref[0] == full[0]: + if indent is None: + padding.append(' ' * (len(full[0]) + len(sep))) + else: + padding.append(' ' * indent) + full.pop(0) + pref.pop(0) + if padding: + if include_sep: + return ''.join(padding) + sep + sep.join(full) + else: + return ''.join(padding) + sep.join(full) + else: + return sep.join(full) + +def _make_pattern(pat): + if pat is None: + return None + if isinstance(pat, (str, unicode)): + pat = re.compile(pat) + if hasattr(pat, 'search'): + return pat.search + if callable(pat): + return pat + assert 0, ( + "Cannot make callable pattern object out of %r" % pat) + +def html_unquote(v): + """ + Unquote (some) entities in HTML. (incomplete) + """ + for ent, repl in [(' ', ' '), ('>', '>'), + ('<', '<'), ('"', '"'), + ('&', '&')]: + v = v.replace(ent, repl) + return v |
