summaryrefslogtreecommitdiff
path: root/tests/testutils
diff options
context:
space:
mode:
authorValentin David <valentin.david@codethink.co.uk>2018-10-29 14:43:52 +0100
committerValentin David <valentin.david@codethink.co.uk>2018-11-29 14:18:17 +0100
commit2e78e0d12bef378a2ea8a1ae4381b5ca0a43c657 (patch)
tree8238b5947dad4023e1c69f1bb8facbedf7b8dfe2 /tests/testutils
parent4a8d0565d4e26c9d9cbbb7d3da24a0cf40ec489c (diff)
downloadbuildstream-2e78e0d12bef378a2ea8a1ae4381b5ca0a43c657.tar.gz
Add support for .netrc in remote/tar/zip plugins
Fixes #723.
Diffstat (limited to 'tests/testutils')
-rw-r--r--tests/testutils/file_server.py19
-rw-r--r--tests/testutils/ftp_server.py32
-rw-r--r--tests/testutils/http_server.py117
3 files changed, 168 insertions, 0 deletions
diff --git a/tests/testutils/file_server.py b/tests/testutils/file_server.py
new file mode 100644
index 000000000..05f896013
--- /dev/null
+++ b/tests/testutils/file_server.py
@@ -0,0 +1,19 @@
+from contextlib import contextmanager
+
+from .ftp_server import SimpleFtpServer
+from .http_server import SimpleHttpServer
+
+
+@contextmanager
+def create_file_server(file_server_type):
+ if file_server_type == 'FTP':
+ server = SimpleFtpServer()
+ elif file_server_type == 'HTTP':
+ server = SimpleHttpServer()
+ else:
+ assert False
+
+ try:
+ yield server
+ finally:
+ server.stop()
diff --git a/tests/testutils/ftp_server.py b/tests/testutils/ftp_server.py
new file mode 100644
index 000000000..52c05f8ba
--- /dev/null
+++ b/tests/testutils/ftp_server.py
@@ -0,0 +1,32 @@
+import multiprocessing
+
+from pyftpdlib.authorizers import DummyAuthorizer
+from pyftpdlib.handlers import FTPHandler
+from pyftpdlib.servers import FTPServer
+
+
+class SimpleFtpServer(multiprocessing.Process):
+ def __init__(self):
+ super().__init__()
+ self.authorizer = DummyAuthorizer()
+ handler = FTPHandler
+ handler.authorizer = self.authorizer
+ self.server = FTPServer(('127.0.0.1', 0), handler)
+
+ def run(self):
+ self.server.serve_forever()
+
+ def stop(self):
+ self.server.close_all()
+ self.server.close()
+ self.terminate()
+ self.join()
+
+ def allow_anonymous(self, cwd):
+ self.authorizer.add_anonymous(cwd)
+
+ def add_user(self, user, password, cwd):
+ self.authorizer.add_user(user, password, cwd, perm='elradfmwMT')
+
+ def base_url(self):
+ return 'ftp://127.0.0.1:{}'.format(self.server.address[1])
diff --git a/tests/testutils/http_server.py b/tests/testutils/http_server.py
new file mode 100644
index 000000000..129003836
--- /dev/null
+++ b/tests/testutils/http_server.py
@@ -0,0 +1,117 @@
+import multiprocessing
+import os
+import posixpath
+import html
+import threading
+import base64
+from http.server import SimpleHTTPRequestHandler, HTTPServer, HTTPStatus
+
+
+class Unauthorized(Exception):
+ pass
+
+
+class RequestHandler(SimpleHTTPRequestHandler):
+
+ def get_root_dir(self):
+ authorization = self.headers.get('authorization')
+ if not authorization:
+ if not self.server.anonymous_dir:
+ raise Unauthorized('unauthorized')
+ return self.server.anonymous_dir
+ else:
+ authorization = authorization.split()
+ if len(authorization) != 2 or authorization[0].lower() != 'basic':
+ raise Unauthorized('unauthorized')
+ try:
+ decoded = base64.decodebytes(authorization[1].encode('ascii'))
+ user, password = decoded.decode('ascii').split(':')
+ expected_password, directory = self.server.users[user]
+ if password == expected_password:
+ return directory
+ except:
+ raise Unauthorized('unauthorized')
+ return None
+
+ def unauthorized(self):
+ shortmsg, longmsg = self.responses[HTTPStatus.UNAUTHORIZED]
+ self.send_response(HTTPStatus.UNAUTHORIZED, shortmsg)
+ self.send_header('Connection', 'close')
+
+ content = (self.error_message_format % {
+ 'code': HTTPStatus.UNAUTHORIZED,
+ 'message': html.escape(longmsg, quote=False),
+ 'explain': html.escape(longmsg, quote=False)
+ })
+ body = content.encode('UTF-8', 'replace')
+ self.send_header('Content-Type', self.error_content_type)
+ self.send_header('Content-Length', str(len(body)))
+ self.send_header('WWW-Authenticate', 'Basic realm="{}"'.format(self.server.realm))
+ self.end_headers()
+ self.end_headers()
+
+ if self.command != 'HEAD' and body:
+ self.wfile.write(body)
+
+ def do_GET(self):
+ try:
+ super().do_GET()
+ except Unauthorized:
+ self.unauthorized()
+
+ def do_HEAD(self):
+ try:
+ super().do_HEAD()
+ except Unauthorized:
+ self.unauthorized()
+
+ def translate_path(self, path):
+ path = path.split('?', 1)[0]
+ path = path.split('#', 1)[0]
+ path = posixpath.normpath(path)
+ assert(posixpath.isabs(path))
+ path = posixpath.relpath(path, '/')
+ return os.path.join(self.get_root_dir(), path)
+
+
+class AuthHTTPServer(HTTPServer):
+ def __init__(self, *args, **kwargs):
+ self.users = {}
+ self.anonymous_dir = None
+ self.realm = 'Realm'
+ super().__init__(*args, **kwargs)
+
+
+class SimpleHttpServer(multiprocessing.Process):
+ def __init__(self):
+ self.__stop = multiprocessing.Queue()
+ super().__init__()
+ self.server = AuthHTTPServer(('127.0.0.1', 0), RequestHandler)
+ self.started = False
+
+ def start(self):
+ self.started = True
+ super().start()
+
+ def run(self):
+ t = threading.Thread(target=self.server.serve_forever)
+ t.start()
+ self.__stop.get()
+ self.server.shutdown()
+ t.join()
+
+ def stop(self):
+ if not self.started:
+ return
+ self.__stop.put(None)
+ self.terminate()
+ self.join()
+
+ def allow_anonymous(self, cwd):
+ self.server.anonymous_dir = cwd
+
+ def add_user(self, user, password, cwd):
+ self.server.users[user] = (password, cwd)
+
+ def base_url(self):
+ return 'http://127.0.0.1:{}'.format(self.server.server_port)