diff options
author | Stefan Behnel <stefan_ml@behnel.de> | 2013-04-28 13:16:58 +0200 |
---|---|---|
committer | Stefan Behnel <stefan_ml@behnel.de> | 2013-04-28 13:16:58 +0200 |
commit | c6f1b6fd48c2da36ac847e1013c1282c901e41e0 (patch) | |
tree | eb72374eaa72b72e11cad585514e157d86ce18a7 | |
parent | 3256ccb8eec06e782fdcc6f39e49dfa4c7896a17 (diff) | |
download | python-lxml-c6f1b6fd48c2da36ac847e1013c1282c901e41e0.tar.gz |
add tests for parsing from HTTP URLs
-rw-r--r-- | src/lxml/tests/dummy_http_server.py | 71 | ||||
-rw-r--r-- | src/lxml/tests/test_http_io.py | 65 | ||||
-rw-r--r-- | test.py | 6 |
3 files changed, 140 insertions, 2 deletions
diff --git a/src/lxml/tests/dummy_http_server.py b/src/lxml/tests/dummy_http_server.py new file mode 100644 index 00000000..0faa6588 --- /dev/null +++ b/src/lxml/tests/dummy_http_server.py @@ -0,0 +1,71 @@ +""" +Simple HTTP request dumper for tests in Python 2.5+. +""" + +import sys +import urlparse +from contextlib import contextmanager + +@contextmanager +def webserver(app, port=0, host=None): + """Context manager entry point for the 'with' statement. + + Pass 0 as port number to dynamically allocate a free port. + + Usage: + + with webserver(wsgi_app_function, 8080) as host_url: + do_ws_calls(host_url) + """ + server = build_web_server(app, port, host or '127.0.0.1') + host, port = server.socket.getsockname() + + import threading + thread = threading.Thread(target=server.serve_forever, + kwargs={'poll_interval': 0.5}) + thread.setDaemon(True) + thread.start() + try: + yield 'http://%s:%s/' % (host, port) # yield control to 'with' body + finally: + server.shutdown() + + +from SocketServer import ThreadingMixIn +import wsgiref.simple_server as wsgiserver +class WebServer(wsgiserver.WSGIServer, ThreadingMixIn): + """A web server that starts a new thread for each request. + """ + + +class _RequestHandler(wsgiserver.WSGIRequestHandler): + def get_stderr(self): + # don't write to stderr + return sys.stdout + + def log_message(self, format, *args): + # message = "wsmock(%s) %s" % (self.address_string(), format % args) + pass # don't log messages + + +def build_web_server(app, port, host=None): + server = wsgiserver.make_server( + host or '', port, app, + server_class=WebServer, + handler_class=_RequestHandler) + return server + + +class HTTPRequestCollector(object): + def __init__(self, response_data, response_code=200, headers=()): + self.requests = [] + self.response_code = response_code + self.response_data = response_data + self.headers = list(headers or ()) + + def __call__(self, environ, start_response): + self.requests.append(( + environ.get('PATH_INFO'), + urlparse.parse_qsl(environ.get('QUERY_STRING')))) + start_response('%s OK' % self.response_code, self.headers) + return [self.response_data] diff --git a/src/lxml/tests/test_http_io.py b/src/lxml/tests/test_http_io.py new file mode 100644 index 00000000..ba873d73 --- /dev/null +++ b/src/lxml/tests/test_http_io.py @@ -0,0 +1,65 @@ +# -*- coding: utf-8 -*- + +""" +Web IO test cases that need Python 2.5+ (wsgiref) +""" + +import unittest +import os +import sys +import gzip + +this_dir = os.path.dirname(__file__) +if this_dir not in sys.path: + sys.path.insert(0, this_dir) # needed for Py3 + +from .common_imports import ( + etree, HelperTestCase, BytesIO, _bytes) +from .dummy_http_server import webserver, HTTPRequestCollector + + +class HttpIOTestCase(HelperTestCase): + etree = etree + + def _parse_from_http(self, data, code=200, headers=None): + handler = HTTPRequestCollector(data, code, headers) + with webserver(handler) as host_url: + tree = self.etree.parse(host_url + 'TEST') + self.assertEqual([('/TEST', [])], handler.requests) + return tree + + def test_http_client(self): + tree = self._parse_from_http(_bytes('<root><a/></root>')) + self.assertEqual('root', tree.getroot().tag) + self.assertEqual('a', tree.getroot()[0].tag) + + def test_http_client_404(self): + try: + self._parse_from_http(_bytes('<root/>'), code=404) + except IOError: + self.assertTrue(True) + else: + self.assertTrue(False, "expected IOError") + + def test_http_client_gzip(self): + f = BytesIO() + gz = gzip.GzipFile(fileobj=f, mode='w', filename='test.xml') + gz.write(_bytes('<root><a/></root>')) + gz.close() + data = f.getvalue() + del f, gz + + headers = [('Content-Encoding', 'gzip')] + tree = self._parse_from_http(data, headers=headers) + self.assertEqual('root', tree.getroot().tag) + self.assertEqual('a', tree.getroot()[0].tag) + + +def test_suite(): + suite = unittest.TestSuite() + suite.addTests([unittest.makeSuite(HttpIOTestCase)]) + return suite + + +if __name__ == '__main__': + print('to test use test.py %s' % __file__) @@ -564,8 +564,10 @@ def main(argv): if sys.version_info[:2] < (2,5): # exclude tests that require the 'with' statement - test_files = [ test_file for test_file in test_files - if 'test_incremental_xmlfile.py' not in test_file ] + test_files = [ + test_file for test_file in test_files + if 'test_incremental_xmlfile.py' not in test_file + and 'test_http_io.py' not in test_file] if cfg.list_tests or cfg.run_tests: test_cases = get_test_cases(test_files, cfg, tracer=tracer) |