summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStefan Behnel <stefan_ml@behnel.de>2013-04-28 13:16:58 +0200
committerStefan Behnel <stefan_ml@behnel.de>2013-04-28 13:16:58 +0200
commitc6f1b6fd48c2da36ac847e1013c1282c901e41e0 (patch)
treeeb72374eaa72b72e11cad585514e157d86ce18a7
parent3256ccb8eec06e782fdcc6f39e49dfa4c7896a17 (diff)
downloadpython-lxml-c6f1b6fd48c2da36ac847e1013c1282c901e41e0.tar.gz
add tests for parsing from HTTP URLs
-rw-r--r--src/lxml/tests/dummy_http_server.py71
-rw-r--r--src/lxml/tests/test_http_io.py65
-rw-r--r--test.py6
3 files changed, 140 insertions, 2 deletions
diff --git a/src/lxml/tests/dummy_http_server.py b/src/lxml/tests/dummy_http_server.py
new file mode 100644
index 00000000..0faa6588
--- /dev/null
+++ b/src/lxml/tests/dummy_http_server.py
@@ -0,0 +1,71 @@
+"""
+Simple HTTP request dumper for tests in Python 2.5+.
+"""
+
+import sys
+import urlparse
+from contextlib import contextmanager
+
+@contextmanager
+def webserver(app, port=0, host=None):
+ """Context manager entry point for the 'with' statement.
+
+ Pass 0 as port number to dynamically allocate a free port.
+
+ Usage:
+
+ with webserver(wsgi_app_function, 8080) as host_url:
+ do_ws_calls(host_url)
+ """
+ server = build_web_server(app, port, host or '127.0.0.1')
+ host, port = server.socket.getsockname()
+
+ import threading
+ thread = threading.Thread(target=server.serve_forever,
+ kwargs={'poll_interval': 0.5})
+ thread.setDaemon(True)
+ thread.start()
+ try:
+ yield 'http://%s:%s/' % (host, port) # yield control to 'with' body
+ finally:
+ server.shutdown()
+
+
+from SocketServer import ThreadingMixIn
+import wsgiref.simple_server as wsgiserver
+class WebServer(wsgiserver.WSGIServer, ThreadingMixIn):
+ """A web server that starts a new thread for each request.
+ """
+
+
+class _RequestHandler(wsgiserver.WSGIRequestHandler):
+ def get_stderr(self):
+ # don't write to stderr
+ return sys.stdout
+
+ def log_message(self, format, *args):
+ # message = "wsmock(%s) %s" % (self.address_string(), format % args)
+ pass # don't log messages
+
+
+def build_web_server(app, port, host=None):
+ server = wsgiserver.make_server(
+ host or '', port, app,
+ server_class=WebServer,
+ handler_class=_RequestHandler)
+ return server
+
+
+class HTTPRequestCollector(object):
+ def __init__(self, response_data, response_code=200, headers=()):
+ self.requests = []
+ self.response_code = response_code
+ self.response_data = response_data
+ self.headers = list(headers or ())
+
+ def __call__(self, environ, start_response):
+ self.requests.append((
+ environ.get('PATH_INFO'),
+ urlparse.parse_qsl(environ.get('QUERY_STRING'))))
+ start_response('%s OK' % self.response_code, self.headers)
+ return [self.response_data]
diff --git a/src/lxml/tests/test_http_io.py b/src/lxml/tests/test_http_io.py
new file mode 100644
index 00000000..ba873d73
--- /dev/null
+++ b/src/lxml/tests/test_http_io.py
@@ -0,0 +1,65 @@
+# -*- coding: utf-8 -*-
+
+"""
+Web IO test cases that need Python 2.5+ (wsgiref)
+"""
+
+import unittest
+import os
+import sys
+import gzip
+
+this_dir = os.path.dirname(__file__)
+if this_dir not in sys.path:
+ sys.path.insert(0, this_dir) # needed for Py3
+
+from .common_imports import (
+ etree, HelperTestCase, BytesIO, _bytes)
+from .dummy_http_server import webserver, HTTPRequestCollector
+
+
+class HttpIOTestCase(HelperTestCase):
+ etree = etree
+
+ def _parse_from_http(self, data, code=200, headers=None):
+ handler = HTTPRequestCollector(data, code, headers)
+ with webserver(handler) as host_url:
+ tree = self.etree.parse(host_url + 'TEST')
+ self.assertEqual([('/TEST', [])], handler.requests)
+ return tree
+
+ def test_http_client(self):
+ tree = self._parse_from_http(_bytes('<root><a/></root>'))
+ self.assertEqual('root', tree.getroot().tag)
+ self.assertEqual('a', tree.getroot()[0].tag)
+
+ def test_http_client_404(self):
+ try:
+ self._parse_from_http(_bytes('<root/>'), code=404)
+ except IOError:
+ self.assertTrue(True)
+ else:
+ self.assertTrue(False, "expected IOError")
+
+ def test_http_client_gzip(self):
+ f = BytesIO()
+ gz = gzip.GzipFile(fileobj=f, mode='w', filename='test.xml')
+ gz.write(_bytes('<root><a/></root>'))
+ gz.close()
+ data = f.getvalue()
+ del f, gz
+
+ headers = [('Content-Encoding', 'gzip')]
+ tree = self._parse_from_http(data, headers=headers)
+ self.assertEqual('root', tree.getroot().tag)
+ self.assertEqual('a', tree.getroot()[0].tag)
+
+
+def test_suite():
+ suite = unittest.TestSuite()
+ suite.addTests([unittest.makeSuite(HttpIOTestCase)])
+ return suite
+
+
+if __name__ == '__main__':
+ print('to test use test.py %s' % __file__)
diff --git a/test.py b/test.py
index 008da074..5ae42e01 100644
--- a/test.py
+++ b/test.py
@@ -564,8 +564,10 @@ def main(argv):
if sys.version_info[:2] < (2,5):
# exclude tests that require the 'with' statement
- test_files = [ test_file for test_file in test_files
- if 'test_incremental_xmlfile.py' not in test_file ]
+ test_files = [
+ test_file for test_file in test_files
+ if 'test_incremental_xmlfile.py' not in test_file
+ and 'test_http_io.py' not in test_file]
if cfg.list_tests or cfg.run_tests:
test_cases = get_test_cases(test_files, cfg, tracer=tracer)