diff options
author | Valentin David <valentin.david@codethink.co.uk> | 2018-10-29 14:43:52 +0100 |
---|---|---|
committer | Valentin David <valentin.david@codethink.co.uk> | 2018-11-29 14:18:17 +0100 |
commit | 2e78e0d12bef378a2ea8a1ae4381b5ca0a43c657 (patch) | |
tree | 8238b5947dad4023e1c69f1bb8facbedf7b8dfe2 /tests/testutils | |
parent | 4a8d0565d4e26c9d9cbbb7d3da24a0cf40ec489c (diff) | |
download | buildstream-2e78e0d12bef378a2ea8a1ae4381b5ca0a43c657.tar.gz |
Add support for .netrc in remote/tar/zip plugins
Fixes #723.
Diffstat (limited to 'tests/testutils')
-rw-r--r-- | tests/testutils/file_server.py | 19 | ||||
-rw-r--r-- | tests/testutils/ftp_server.py | 32 | ||||
-rw-r--r-- | tests/testutils/http_server.py | 117 |
3 files changed, 168 insertions, 0 deletions
diff --git a/tests/testutils/file_server.py b/tests/testutils/file_server.py new file mode 100644 index 000000000..05f896013 --- /dev/null +++ b/tests/testutils/file_server.py @@ -0,0 +1,19 @@ +from contextlib import contextmanager + +from .ftp_server import SimpleFtpServer +from .http_server import SimpleHttpServer + + +@contextmanager +def create_file_server(file_server_type): + if file_server_type == 'FTP': + server = SimpleFtpServer() + elif file_server_type == 'HTTP': + server = SimpleHttpServer() + else: + assert False + + try: + yield server + finally: + server.stop() diff --git a/tests/testutils/ftp_server.py b/tests/testutils/ftp_server.py new file mode 100644 index 000000000..52c05f8ba --- /dev/null +++ b/tests/testutils/ftp_server.py @@ -0,0 +1,32 @@ +import multiprocessing + +from pyftpdlib.authorizers import DummyAuthorizer +from pyftpdlib.handlers import FTPHandler +from pyftpdlib.servers import FTPServer + + +class SimpleFtpServer(multiprocessing.Process): + def __init__(self): + super().__init__() + self.authorizer = DummyAuthorizer() + handler = FTPHandler + handler.authorizer = self.authorizer + self.server = FTPServer(('127.0.0.1', 0), handler) + + def run(self): + self.server.serve_forever() + + def stop(self): + self.server.close_all() + self.server.close() + self.terminate() + self.join() + + def allow_anonymous(self, cwd): + self.authorizer.add_anonymous(cwd) + + def add_user(self, user, password, cwd): + self.authorizer.add_user(user, password, cwd, perm='elradfmwMT') + + def base_url(self): + return 'ftp://127.0.0.1:{}'.format(self.server.address[1]) diff --git a/tests/testutils/http_server.py b/tests/testutils/http_server.py new file mode 100644 index 000000000..129003836 --- /dev/null +++ b/tests/testutils/http_server.py @@ -0,0 +1,117 @@ +import multiprocessing +import os +import posixpath +import html +import threading +import base64 +from http.server import SimpleHTTPRequestHandler, HTTPServer, HTTPStatus + + +class Unauthorized(Exception): + pass + + +class RequestHandler(SimpleHTTPRequestHandler): + + def get_root_dir(self): + authorization = self.headers.get('authorization') + if not authorization: + if not self.server.anonymous_dir: + raise Unauthorized('unauthorized') + return self.server.anonymous_dir + else: + authorization = authorization.split() + if len(authorization) != 2 or authorization[0].lower() != 'basic': + raise Unauthorized('unauthorized') + try: + decoded = base64.decodebytes(authorization[1].encode('ascii')) + user, password = decoded.decode('ascii').split(':') + expected_password, directory = self.server.users[user] + if password == expected_password: + return directory + except: + raise Unauthorized('unauthorized') + return None + + def unauthorized(self): + shortmsg, longmsg = self.responses[HTTPStatus.UNAUTHORIZED] + self.send_response(HTTPStatus.UNAUTHORIZED, shortmsg) + self.send_header('Connection', 'close') + + content = (self.error_message_format % { + 'code': HTTPStatus.UNAUTHORIZED, + 'message': html.escape(longmsg, quote=False), + 'explain': html.escape(longmsg, quote=False) + }) + body = content.encode('UTF-8', 'replace') + self.send_header('Content-Type', self.error_content_type) + self.send_header('Content-Length', str(len(body))) + self.send_header('WWW-Authenticate', 'Basic realm="{}"'.format(self.server.realm)) + self.end_headers() + self.end_headers() + + if self.command != 'HEAD' and body: + self.wfile.write(body) + + def do_GET(self): + try: + super().do_GET() + except Unauthorized: + self.unauthorized() + + def do_HEAD(self): + try: + super().do_HEAD() + except Unauthorized: + self.unauthorized() + + def translate_path(self, path): + path = path.split('?', 1)[0] + path = path.split('#', 1)[0] + path = posixpath.normpath(path) + assert(posixpath.isabs(path)) + path = posixpath.relpath(path, '/') + return os.path.join(self.get_root_dir(), path) + + +class AuthHTTPServer(HTTPServer): + def __init__(self, *args, **kwargs): + self.users = {} + self.anonymous_dir = None + self.realm = 'Realm' + super().__init__(*args, **kwargs) + + +class SimpleHttpServer(multiprocessing.Process): + def __init__(self): + self.__stop = multiprocessing.Queue() + super().__init__() + self.server = AuthHTTPServer(('127.0.0.1', 0), RequestHandler) + self.started = False + + def start(self): + self.started = True + super().start() + + def run(self): + t = threading.Thread(target=self.server.serve_forever) + t.start() + self.__stop.get() + self.server.shutdown() + t.join() + + def stop(self): + if not self.started: + return + self.__stop.put(None) + self.terminate() + self.join() + + def allow_anonymous(self, cwd): + self.server.anonymous_dir = cwd + + def add_user(self, user, password, cwd): + self.server.users[user] = (password, cwd) + + def base_url(self): + return 'http://127.0.0.1:{}'.format(self.server.server_port) |