diff options
-rw-r--r-- | buildstream/_artifactcache/ostreecache.py | 378 | ||||
-rw-r--r-- | buildstream/_artifactcache/pushreceive.py | 812 | ||||
-rw-r--r-- | buildstream/_ostree.py | 241 | ||||
-rwxr-xr-x | setup.py | 43 |
4 files changed, 0 insertions, 1474 deletions
diff --git a/buildstream/_artifactcache/ostreecache.py b/buildstream/_artifactcache/ostreecache.py deleted file mode 100644 index c802fc2e2..000000000 --- a/buildstream/_artifactcache/ostreecache.py +++ /dev/null @@ -1,378 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (C) 2017-2018 Codethink Limited -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License as published by the Free Software Foundation; either -# version 2 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library. If not, see <http://www.gnu.org/licenses/>. -# -# Authors: -# Jürg Billeter <juerg.billeter@codethink.co.uk> - -import multiprocessing -import os -import signal -import tempfile - -from .. import _ostree, _signals, utils -from .._exceptions import ArtifactError -from .._ostree import OSTreeError - -from . import ArtifactCache -from .pushreceive import initialize_push_connection -from .pushreceive import push as push_artifact -from .pushreceive import PushException - - -# An OSTreeCache manages artifacts in an OSTree repository -# -# Args: -# context (Context): The BuildStream context -# project (Project): The BuildStream project -# enable_push (bool): Whether pushing is allowed by the platform -# -# Pushing is explicitly disabled by the platform in some cases, -# like when we are falling back to functioning without using -# user namespaces. -# -class OSTreeCache(ArtifactCache): - - def __init__(self, context, *, enable_push): - super().__init__(context) - - self.enable_push = enable_push - - ostreedir = os.path.join(context.artifactdir, 'ostree') - self.repo = _ostree.ensure(ostreedir, False) - - # Per-project list of OSTreeRemote instances. - self._remotes = {} - - self._has_fetch_remotes = False - self._has_push_remotes = False - - ################################################ - # Implementation of abstract methods # - ################################################ - def has_fetch_remotes(self, *, element=None): - if not self._has_fetch_remotes: - # No project has push remotes - return False - elif element is None: - # At least one (sub)project has fetch remotes - return True - else: - # Check whether the specified element's project has fetch remotes - remotes_for_project = self._remotes[element._get_project()] - return bool(remotes_for_project) - - def has_push_remotes(self, *, element=None): - if not self._has_push_remotes: - # No project has push remotes - return False - elif element is None: - # At least one (sub)project has push remotes - return True - else: - # Check whether the specified element's project has push remotes - remotes_for_project = self._remotes[element._get_project()] - return any(remote.spec.push for remote in remotes_for_project) - - def contains(self, element, key): - ref = self.get_artifact_fullname(element, key) - return _ostree.exists(self.repo, ref) - - def extract(self, element, key): - ref = self.get_artifact_fullname(element, key) - - # resolve ref to checksum - rev = _ostree.checksum(self.repo, ref) - - # Extracting a nonexistent artifact is a bug - assert rev, "Artifact missing for {}".format(ref) - - dest = os.path.join(self.extractdir, element._get_project().name, element.normal_name, rev) - if os.path.isdir(dest): - # artifact has already been extracted - return dest - - os.makedirs(self.extractdir, exist_ok=True) - with tempfile.TemporaryDirectory(prefix='tmp', dir=self.extractdir) as tmpdir: - - checkoutdir = os.path.join(tmpdir, ref) - - _ostree.checkout(self.repo, checkoutdir, rev, user=True) - - os.makedirs(os.path.dirname(dest), exist_ok=True) - try: - os.rename(checkoutdir, dest) - except OSError as e: - # With rename, it's possible to get either ENOTEMPTY or EEXIST - # in the case that the destination path is a not empty directory. - # - # If rename fails with these errors, another process beat - # us to it so just ignore. - if e.errno not in [os.errno.ENOTEMPTY, os.errno.EEXIST]: - raise ArtifactError("Failed to extract artifact for ref '{}': {}" - .format(ref, e)) from e - - return dest - - def commit(self, element, content, keys): - refs = [self.get_artifact_fullname(element, key) for key in keys] - - try: - _ostree.commit(self.repo, content, refs) - except OSTreeError as e: - raise ArtifactError("Failed to commit artifact: {}".format(e)) from e - - def can_diff(self): - return True - - def diff(self, element, key_a, key_b, *, subdir=None): - _, a, _ = self.repo.read_commit(self.get_artifact_fullname(element, key_a)) - _, b, _ = self.repo.read_commit(self.get_artifact_fullname(element, key_b)) - - if subdir: - a = a.get_child(subdir) - b = b.get_child(subdir) - - subpath = a.get_path() - else: - subpath = '/' - - modified, removed, added = _ostree.diff_dirs(a, b) - - modified = [os.path.relpath(item.target.get_path(), subpath) for item in modified] - removed = [os.path.relpath(item.get_path(), subpath) for item in removed] - added = [os.path.relpath(item.get_path(), subpath) for item in added] - - return modified, removed, added - - def pull(self, element, key, *, progress=None): - project = element._get_project() - - ref = self.get_artifact_fullname(element, key) - - for remote in self._remotes[project]: - try: - # fetch the artifact from highest priority remote using the specified cache key - remote_name = self._ensure_remote(self.repo, remote.pull_url) - _ostree.fetch(self.repo, remote=remote_name, ref=ref, progress=progress) - return True - except OSTreeError: - # Try next remote - continue - - return False - - def link_key(self, element, oldkey, newkey): - oldref = self.get_artifact_fullname(element, oldkey) - newref = self.get_artifact_fullname(element, newkey) - - # resolve ref to checksum - rev = _ostree.checksum(self.repo, oldref) - - # create additional ref for the same checksum - _ostree.set_ref(self.repo, newref, rev) - - def push(self, element, keys): - any_pushed = False - - project = element._get_project() - - push_remotes = [r for r in self._remotes[project] if r.spec.push] - - if not push_remotes: - raise ArtifactError("Push is not enabled for any of the configured remote artifact caches.") - - refs = [self.get_artifact_fullname(element, key) for key in keys] - - for remote in push_remotes: - any_pushed |= self._push_to_remote(remote, element, refs) - - return any_pushed - - def initialize_remotes(self, *, on_failure=None): - remote_specs = self.global_remote_specs.copy() - - for project in self.project_remote_specs: - remote_specs.extend(self.project_remote_specs[project]) - - remote_specs = list(utils._deduplicate(remote_specs)) - - remote_results = {} - - # Callback to initialize one remote in a 'multiprocessing' subprocess. - # - # We cannot do this in the main process because of the way the tasks - # run by the main scheduler calls into libostree using - # fork()-without-exec() subprocesses. OSTree fetch operations in - # subprocesses hang if fetch operations were previously done in the - # main process. - # - def child_action(url, q): - try: - push_url, pull_url = self._initialize_remote(url) - q.put((None, push_url, pull_url)) - except Exception as e: # pylint: disable=broad-except - # Whatever happens, we need to return it to the calling process - # - q.put((str(e), None, None, None)) - - # Kick off all the initialization jobs one by one. - # - # Note that we cannot use multiprocessing.Pool here because it's not - # possible to pickle local functions such as child_action(). - # - q = multiprocessing.Queue() - for remote_spec in remote_specs: - p = multiprocessing.Process(target=child_action, args=(remote_spec.url, q)) - - try: - - # Keep SIGINT blocked in the child process - with _signals.blocked([signal.SIGINT], ignore=False): - p.start() - - error, push_url, pull_url = q.get() - p.join() - except KeyboardInterrupt: - utils._kill_process_tree(p.pid) - raise - - if error and on_failure: - on_failure(remote_spec.url, error) - elif error: - raise ArtifactError(error) - else: - if remote_spec.push and push_url: - self._has_push_remotes = True - if pull_url: - self._has_fetch_remotes = True - - remote_results[remote_spec.url] = (push_url, pull_url) - - # Prepare push_urls and pull_urls for each project - for project in self.context.get_projects(): - remote_specs = self.global_remote_specs - if project in self.project_remote_specs: - remote_specs = list(utils._deduplicate(remote_specs + self.project_remote_specs[project])) - - remotes = [] - - for remote_spec in remote_specs: - # Errors are already handled in the loop above, - # skip unreachable remotes here. - if remote_spec.url not in remote_results: - continue - - push_url, pull_url = remote_results[remote_spec.url] - - if remote_spec.push and not push_url: - raise ArtifactError("Push enabled but not supported by repo at: {}".format(remote_spec.url)) - - remote = _OSTreeRemote(remote_spec, pull_url, push_url) - remotes.append(remote) - - self._remotes[project] = remotes - - ################################################ - # Local Private Methods # - ################################################ - - # _initialize_remote(): - # - # Do protocol-specific initialization necessary to use a given OSTree - # remote. - # - # The SSH protocol that we use only supports pushing so initializing these - # involves contacting the remote to find out the corresponding pull URL. - # - # Args: - # url (str): URL of the remote - # - # Returns: - # (str, str): the pull URL and push URL for the remote - # - # Raises: - # ArtifactError: if there was an error - def _initialize_remote(self, url): - if url.startswith('ssh://'): - try: - push_url = url - pull_url = initialize_push_connection(url) - except PushException as e: - raise ArtifactError(e) from e - elif url.startswith('/'): - push_url = pull_url = 'file://' + url - elif url.startswith('file://'): - push_url = pull_url = url - elif url.startswith('http://') or url.startswith('https://'): - push_url = None - pull_url = url - else: - raise ArtifactError("Unsupported URL: {}".format(url)) - - return push_url, pull_url - - # _ensure_remote(): - # - # Ensure that our OSTree repo has a remote configured for the given URL. - # Note that SSH access to remotes is not handled by libostree itself. - # - # Args: - # repo (OSTree.Repo): an OSTree repository - # pull_url (str): the URL where libostree can pull from the remote - # - # Returns: - # (str): the name of the remote, which can be passed to various other - # operations implemented by the _ostree module. - # - # Raises: - # OSTreeError: if there was a problem reported by libostree - def _ensure_remote(self, repo, pull_url): - remote_name = utils.url_directory_name(pull_url) - _ostree.configure_remote(repo, remote_name, pull_url) - return remote_name - - def _push_to_remote(self, remote, element, refs): - with utils._tempdir(dir=self.context.artifactdir, prefix='push-repo-') as temp_repo_dir: - - with element.timed_activity("Preparing compressed archive"): - # First create a temporary archive-z2 repository, we can - # only use ostree-push with archive-z2 local repo. - temp_repo = _ostree.ensure(temp_repo_dir, True) - - # Now push the ref we want to push into our temporary archive-z2 repo - for ref in refs: - _ostree.fetch(temp_repo, remote=self.repo.get_path().get_uri(), ref=ref) - - with element.timed_activity("Sending artifact"), \ - element._output_file() as output_file: - try: - pushed = push_artifact(temp_repo.get_path().get_path(), - remote.push_url, - refs, output_file) - except PushException as e: - raise ArtifactError("Failed to push artifact {}: {}".format(refs, e)) from e - - return pushed - - -# Represents a single remote OSTree cache. -# -class _OSTreeRemote(): - def __init__(self, spec, pull_url, push_url): - self.spec = spec - self.pull_url = pull_url - self.push_url = push_url diff --git a/buildstream/_artifactcache/pushreceive.py b/buildstream/_artifactcache/pushreceive.py deleted file mode 100644 index 777065e18..000000000 --- a/buildstream/_artifactcache/pushreceive.py +++ /dev/null @@ -1,812 +0,0 @@ -#!/usr/bin/python3 - -# Push OSTree commits to a remote repo, based on Dan Nicholson's ostree-push -# -# Copyright (C) 2015 Dan Nicholson <nicholson@endlessm.com> -# Copyright (C) 2017 Tristan Van Berkom <tristan.vanberkom@codethink.co.uk> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. -import logging -import multiprocessing -import os -import re -import subprocess -import sys -import shutil -import tarfile -import tempfile -from enum import Enum -from urllib.parse import urlparse - -import click -import gi - -from .. import _signals # nopep8 -from .._profile import Topics, profile_start, profile_end - -gi.require_version('OSTree', '1.0') -# pylint: disable=wrong-import-position,wrong-import-order -from gi.repository import GLib, Gio, OSTree # nopep8 - - -PROTO_VERSION = 1 -HEADER_SIZE = 5 - - -# An error occurred -class PushException(Exception): - pass - - -# Trying to commit a ref which already exists in remote -class PushExistsException(Exception): - pass - - -class PushCommandType(Enum): - info = 0 - update = 1 - putobjects = 2 - status = 3 - done = 4 - - -def python_to_msg_byteorder(python_byteorder=sys.byteorder): - if python_byteorder == 'little': - return 'l' - elif python_byteorder == 'big': - return 'B' - else: - raise PushException('Unrecognized system byteorder {}' - .format(python_byteorder)) - - -def msg_to_python_byteorder(msg_byteorder): - if msg_byteorder == 'l': - return 'little' - elif msg_byteorder == 'B': - return 'big' - else: - raise PushException('Unrecognized message byteorder {}' - .format(msg_byteorder)) - - -def ostree_object_path(repo, obj): - repodir = repo.get_path().get_path() - return os.path.join(repodir, 'objects', obj[0:2], obj[2:]) - - -class PushCommand(object): - def __init__(self, cmdtype, args): - self.cmdtype = cmdtype - self.args = args - self.validate(self.cmdtype, self.args) - self.variant = GLib.Variant('a{sv}', self.args) - - @staticmethod - def validate(command, args): - if not isinstance(command, PushCommandType): - raise PushException('Message command must be PushCommandType') - if not isinstance(args, dict): - raise PushException('Message args must be dict') - # Ensure all values are variants for a{sv} vardict - for val in args.values(): - if not isinstance(val, GLib.Variant): - raise PushException('Message args values must be ' - 'GLib.Variant') - - -class PushMessageWriter(object): - def __init__(self, file, byteorder=sys.byteorder): - self.file = file - self.byteorder = byteorder - self.msg_byteorder = python_to_msg_byteorder(self.byteorder) - - def encode_header(self, cmdtype, size): - header = self.msg_byteorder.encode() + \ - PROTO_VERSION.to_bytes(1, self.byteorder) + \ - cmdtype.value.to_bytes(1, self.byteorder) + \ - size.to_bytes(2, self.byteorder) - return header - - def encode_message(self, command): - if not isinstance(command, PushCommand): - raise PushException('Command must be PushCommand') - data = command.variant.get_data_as_bytes() - size = data.get_size() - - # Build the header - header = self.encode_header(command.cmdtype, size) - - return header + data.get_data() - - def write(self, command): - msg = self.encode_message(command) - self.file.write(msg) - self.file.flush() - - def send_hello(self): - # The 'hello' message is used to check connectivity and discover the - # cache's pull URL. It's actually transmitted as an empty info request. - args = { - 'mode': GLib.Variant('i', 0), - 'refs': GLib.Variant('a{ss}', {}) - } - command = PushCommand(PushCommandType.info, args) - self.write(command) - - def send_info(self, repo, refs, pull_url=None): - cmdtype = PushCommandType.info - mode = repo.get_mode() - - ref_map = {} - for ref in refs: - _, checksum = repo.resolve_rev(ref, True) - if checksum: - _, has_object = repo.has_object(OSTree.ObjectType.COMMIT, checksum, None) - if has_object: - ref_map[ref] = checksum - - args = { - 'mode': GLib.Variant('i', mode), - 'refs': GLib.Variant('a{ss}', ref_map) - } - - # The server sends this so clients can discover the correct pull URL - # for this cache without requiring end-users to specify it. - if pull_url: - args['pull_url'] = GLib.Variant('s', pull_url) - - command = PushCommand(cmdtype, args) - self.write(command) - - def send_update(self, refs): - cmdtype = PushCommandType.update - args = {} - for branch, revs in refs.items(): - args[branch] = GLib.Variant('(ss)', revs) - command = PushCommand(cmdtype, args) - self.write(command) - - def send_putobjects(self, repo, objects): - - logging.info('Sending {} objects'.format(len(objects))) - - # Send command saying we're going to send a stream of objects - cmdtype = PushCommandType.putobjects - command = PushCommand(cmdtype, {}) - self.write(command) - - # Open a TarFile for writing uncompressed tar to a stream - tar = tarfile.TarFile.open(mode='w|', fileobj=self.file) - for obj in objects: - - logging.info('Sending object {}'.format(obj)) - objpath = ostree_object_path(repo, obj) - stat = os.stat(objpath) - - tar_info = tarfile.TarInfo(obj) - tar_info.mtime = stat.st_mtime - tar_info.size = stat.st_size - with open(objpath, 'rb') as obj_fp: - tar.addfile(tar_info, obj_fp) - - # We're done, close the tarfile - tar.close() - - def send_status(self, result, message=''): - cmdtype = PushCommandType.status - args = { - 'result': GLib.Variant('b', result), - 'message': GLib.Variant('s', message) - } - command = PushCommand(cmdtype, args) - self.write(command) - - def send_done(self): - command = PushCommand(PushCommandType.done, {}) - self.write(command) - - -class PushMessageReader(object): - def __init__(self, file, byteorder=sys.byteorder, tmpdir=None): - self.file = file - self.byteorder = byteorder - self.tmpdir = tmpdir - - def decode_header(self, header): - if len(header) != HEADER_SIZE: - raise Exception('Header is {:d} bytes, not {:d}'.format(len(header), HEADER_SIZE)) - order = msg_to_python_byteorder(chr(header[0])) - version = int(header[1]) - if version != PROTO_VERSION: - raise Exception('Unsupported protocol version {:d}'.format(version)) - cmdtype = PushCommandType(int(header[2])) - vlen = int.from_bytes(header[3:], order) - return order, version, cmdtype, vlen - - def decode_message(self, message, size, order): - if len(message) != size: - raise Exception('Expected {:d} bytes, but got {:d}'.format(size, len(message))) - data = GLib.Bytes.new(message) - variant = GLib.Variant.new_from_bytes(GLib.VariantType.new('a{sv}'), - data, False) - if order != self.byteorder: - variant = GLib.Variant.byteswap(variant) - - return variant - - def read(self): - header = self.file.read(HEADER_SIZE) - if not header: - # Remote end quit - return None, None - order, _, cmdtype, size = self.decode_header(header) - msg = self.file.read(size) - if len(msg) != size: - raise PushException('Did not receive full message') - args = self.decode_message(msg, size, order) - - return cmdtype, args - - def receive(self, allowed): - cmdtype, args = self.read() - if cmdtype is None: - raise PushException('Expected reply, got none') - if cmdtype not in allowed: - raise PushException('Unexpected reply type', cmdtype.name) - return cmdtype, args.unpack() - - def receive_info(self): - _, args = self.receive([PushCommandType.info]) - return args - - def receive_update(self): - _, args = self.receive([PushCommandType.update]) - return args - - def receive_putobjects(self, repo): - - received_objects = [] - - # Open a TarFile for reading uncompressed tar from a stream - tar = tarfile.TarFile.open(mode='r|', fileobj=self.file) - - # Extract every tarinfo into the temp location - # - # This should block while tar.next() reads the next - # tar object from the stream. - while True: - filepos = tar.fileobj.tell() - tar_info = tar.next() - if not tar_info: - # End of stream marker consists of two 512 Byte blocks. - # Current Python tarfile stops reading after the first block. - # Read the second block as well to ensure the stream is at - # the right position for following messages. - if tar.fileobj.tell() - filepos < 1024: - tar.fileobj.read(512) - break - - tar.extract(tar_info, self.tmpdir) - received_objects.append(tar_info.name) - - # Finished with this stream - tar.close() - - return received_objects - - def receive_status(self): - _, args = self.receive([PushCommandType.status]) - return args - - def receive_done(self): - _, args = self.receive([PushCommandType.done]) - return args - - -def parse_remote_location(remotepath): - """Parse remote artifact cache URL that's been specified in our config.""" - remote_host = remote_user = remote_repo = None - - url = urlparse(remotepath, scheme='file') - if url.scheme: - if url.scheme not in ['file', 'ssh']: - raise PushException('Only URL schemes file and ssh are allowed, ' - 'not "{}"'.format(url.scheme)) - remote_host = url.hostname - remote_user = url.username - remote_repo = url.path - remote_port = url.port or 22 - else: - # Scp/git style remote (user@hostname:path) - parts = remotepath.split('@', 1) - if len(parts) > 1: - remote_user = parts[0] - remainder = parts[1] - else: - remote_user = None - remainder = parts[0] - parts = remainder.split(':', 1) - if len(parts) != 2: - raise PushException('Remote repository "{}" does not ' - 'contain a hostname and path separated ' - 'by ":"'.format(remotepath)) - remote_host, remote_repo = parts - # This form doesn't make it possible to specify a non-standard port. - remote_port = 22 - - return remote_host, remote_user, remote_repo, remote_port - - -def ssh_commandline(remote_host, remote_user=None, remote_port=22): - if remote_host is None: - return [] - - ssh_cmd = ['ssh'] - if remote_user: - ssh_cmd += ['-l', remote_user] - if remote_port != 22: - ssh_cmd += ['-p', str(remote_port)] - ssh_cmd += [remote_host] - return ssh_cmd - - -def foo_run(func, args, stdin_fd, stdout_fd, stderr_fd): - sys.stdin = open(stdin_fd, 'r') - sys.stdout = open(stdout_fd, 'w') - sys.stderr = open(stderr_fd, 'w') - func(args) - - -class ProcessWithPipes(object): - def __init__(self, func, args, *, stderr=None): - r0, w0 = os.pipe() - r1, w1 = os.pipe() - if stderr is None: - r2, w2 = os.pipe() - else: - w2 = stderr.fileno() - self.proc = multiprocessing.Process(target=foo_run, args=(func, args, r0, w1, w2)) - self.proc.start() - self.stdin = open(w0, 'wb') - os.close(r0) - self.stdout = open(r1, 'rb') - os.close(w1) - if stderr is None: - self.stderr = open(r2, 'rb') - os.close(w2) - - # The eventual return code - self.returncode = -1 - - def wait(self): - self.proc.join() - self.returncode = self.proc.exitcode - - -class OSTreePusher(object): - def __init__(self, repopath, remotepath, branches=None, verbose=False, - debug=False, output=None): - self.repopath = repopath - self.remotepath = remotepath - self.verbose = verbose - self.debug = debug - self.output = output - - self.remote_host, self.remote_user, self.remote_repo, self.remote_port = \ - parse_remote_location(remotepath) - - if self.repopath is None: - self.repo = OSTree.Repo.new_default() - else: - self.repo = OSTree.Repo.new(Gio.File.new_for_path(self.repopath)) - self.repo.open(None) - - # Enumerate branches to push - if branches is None: - _, self.refs = self.repo.list_refs(None, None) - else: - self.refs = {} - for branch in branches: - _, rev = self.repo.resolve_rev(branch, False) - self.refs[branch] = rev - - # Start ssh - ssh_cmd = ssh_commandline(self.remote_host, self.remote_user, self.remote_port) - - ssh_cmd += ['bst-artifact-receive'] - if self.verbose: - ssh_cmd += ['--verbose'] - if self.debug: - ssh_cmd += ['--debug'] - if not self.remote_host: - ssh_cmd += ['--pull-url', self.remote_repo] - ssh_cmd += [self.remote_repo] - - logging.info('Executing {}'.format(' '.join(ssh_cmd))) - - if self.remote_host: - self.ssh = subprocess.Popen(ssh_cmd, stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=self.output, - start_new_session=True) - else: - self.ssh = ProcessWithPipes(receive_main, ssh_cmd[1:], stderr=self.output) - - self.writer = PushMessageWriter(self.ssh.stdin) - self.reader = PushMessageReader(self.ssh.stdout) - - def needed_commits(self, remote, local, needed): - parent = local - if remote == '0' * 64: - # Nonexistent remote branch, use None for convenience - remote = None - while parent != remote: - needed.add(parent) - _, commit = self.repo.load_variant_if_exists(OSTree.ObjectType.COMMIT, - parent) - if commit is None: - raise PushException('Shallow history from commit {} does ' - 'not contain remote commit {}'.format(local, remote)) - parent = OSTree.commit_get_parent(commit) - if parent is None: - break - if remote is not None and parent != remote: - self.writer.send_done() - raise PushExistsException('Remote commit {} not descendent of ' - 'commit {}'.format(remote, local)) - - def needed_objects(self, commits): - objects = set() - for rev in commits: - _, reachable = self.repo.traverse_commit(rev, 0, None) - for obj in reachable: - objname = OSTree.object_to_string(obj[0], obj[1]) - if obj[1] == OSTree.ObjectType.FILE: - # Make this a filez since we're archive-z2 - objname += 'z' - elif obj[1] == OSTree.ObjectType.COMMIT: - # Add in detached metadata - metaobj = objname + 'meta' - metapath = ostree_object_path(self.repo, metaobj) - if os.path.exists(metapath): - objects.add(metaobj) - - # Add in Endless compat files - for suffix in ['sig', 'sizes2']: - metaobj = obj[0] + '.' + suffix - metapath = ostree_object_path(self.repo, metaobj) - if os.path.exists(metapath): - objects.add(metaobj) - objects.add(objname) - return objects - - def close(self): - self.ssh.stdin.close() - return self.ssh.wait() - - def run(self): - remote_refs = {} - update_refs = {} - - # Send info immediately - self.writer.send_info(self.repo, list(self.refs.keys())) - - # Receive remote info - logging.info('Receiving repository information') - args = self.reader.receive_info() - remote_mode = args['mode'] - if remote_mode != OSTree.RepoMode.ARCHIVE_Z2: - raise PushException('Can only push to archive-z2 repos') - remote_refs = args['refs'] - for branch, rev in self.refs.items(): - remote_rev = remote_refs.get(branch, '0' * 64) - if rev != remote_rev: - update_refs[branch] = remote_rev, rev - if not update_refs: - logging.info('Nothing to update') - self.writer.send_done() - raise PushExistsException('Nothing to update') - - # Send update command - logging.info('Sending update request') - self.writer.send_update(update_refs) - - # Receive status for update request - args = self.reader.receive_status() - if not args['result']: - self.writer.send_done() - raise PushException(args['message']) - - # Collect commits and objects to push - commits = set() - exc_info = None - ref_count = 0 - for branch, revs in update_refs.items(): - logging.info('Updating {} {} to {}'.format(branch, revs[0], revs[1])) - try: - self.needed_commits(revs[0], revs[1], commits) - ref_count += 1 - except PushExistsException: - if exc_info is None: - exc_info = sys.exc_info() - - # Re-raise PushExistsException if all refs exist already - if ref_count == 0 and exc_info: - raise exc_info[0].with_traceback(exc_info[1], exc_info[2]) - - logging.info('Enumerating objects to send') - objects = self.needed_objects(commits) - - # Send all the objects to receiver, checking status after each - self.writer.send_putobjects(self.repo, objects) - - # Inform receiver that all objects have been sent - self.writer.send_done() - - # Wait until receiver has completed - self.reader.receive_done() - - return self.close() - - -# OSTreeReceiver is on the receiving end of an OSTree push. -# -# Args: -# repopath (str): On-disk location of the receiving repository. -# pull_url (str): Redirection for clients who want to pull, not push. -# -class OSTreeReceiver(object): - def __init__(self, repopath, pull_url): - self.repopath = repopath - self.pull_url = pull_url - - if self.repopath is None: - self.repo = OSTree.Repo.new_default() - else: - self.repo = OSTree.Repo.new(Gio.File.new_for_path(self.repopath)) - self.repo.open(None) - - repo_tmp = os.path.join(self.repopath, 'tmp') - self.tmpdir = tempfile.mkdtemp(dir=repo_tmp, prefix='bst-push-') - self.writer = PushMessageWriter(sys.stdout.buffer) - self.reader = PushMessageReader(sys.stdin.buffer, tmpdir=self.tmpdir) - - # Set a sane umask before writing any objects - os.umask(0o0022) - - def close(self): - shutil.rmtree(self.tmpdir) - sys.stdout.flush() - return 0 - - def run(self): - try: - exit_code = self.do_run() - self.close() - return exit_code - except: - # BLIND EXCEPT - Just abort if we receive any exception, this - # can be a broken pipe, a tarfile read error when the remote - # connection is closed, a bug; whatever happens we want to cleanup. - self.close() - raise - - def do_run(self): - # Receive remote info - args = self.reader.receive_info() - remote_refs = args['refs'] - - # Send info back - self.writer.send_info(self.repo, list(remote_refs.keys()), - pull_url=self.pull_url) - - # Wait for update or done command - cmdtype, args = self.reader.receive([PushCommandType.update, - PushCommandType.done]) - if cmdtype == PushCommandType.done: - return 0 - update_refs = args - - profile_names = set() - for update_ref in update_refs: - # Strip off the SHA256 sum on the right of the reference, - # leaving the project and element name - project_and_element_name = re.sub(r"/[a-z0-9]+$", '', update_ref) - profile_names.add(project_and_element_name) - - profile_name = '_'.join(profile_names) - profile_start(Topics.ARTIFACT_RECEIVE, profile_name) - - self.writer.send_status(True) - - # Wait for putobjects or done - cmdtype, args = self.reader.receive([PushCommandType.putobjects, - PushCommandType.done]) - - if cmdtype == PushCommandType.done: - logging.debug('Received done before any objects, exiting') - return 0 - - # Receive the actual objects - received_objects = self.reader.receive_putobjects(self.repo) - - # Ensure that pusher has sent all objects - self.reader.receive_done() - - # If we didn't get any objects, we're done - if not received_objects: - return 0 - - # Got all objects, move them to the object store - for obj in received_objects: - tmp_path = os.path.join(self.tmpdir, obj) - obj_path = ostree_object_path(self.repo, obj) - os.makedirs(os.path.dirname(obj_path), exist_ok=True) - logging.debug('Renaming {} to {}'.format(tmp_path, obj_path)) - os.rename(tmp_path, obj_path) - - # Verify that we have the specified commit objects - for branch, revs in update_refs.items(): - _, has_object = self.repo.has_object(OSTree.ObjectType.COMMIT, revs[1], None) - if not has_object: - raise PushException('Missing commit {} for ref {}'.format(revs[1], branch)) - - # Finally, update the refs - for branch, revs in update_refs.items(): - logging.debug('Setting ref {} to {}'.format(branch, revs[1])) - self.repo.set_ref_immediate(None, branch, revs[1], None) - - # Inform pusher that everything is in place - self.writer.send_done() - - profile_end(Topics.ARTIFACT_RECEIVE, profile_name) - - return 0 - - -# initialize_push_connection() -# -# Test that we can connect to the remote bst-artifact-receive program, and -# receive the pull URL for this artifact cache. -# -# We don't want to make the user wait until the first artifact has been built -# to discover that they actually cannot push, so this should be called early. -# -# The SSH push protocol doesn't allow pulling artifacts. We don't want to -# require users to specify two URLs for a single cache, so we have the push -# protocol return the corresponding pull URL as part of the 'hello' response. -# -# Args: -# remote: The ssh remote url to push to -# -# Returns: -# (str): The URL that should be used for pushing to this cache. -# -# Raises: -# PushException if there was an issue connecting to the remote. -def initialize_push_connection(remote): - remote_host, remote_user, remote_repo, remote_port = parse_remote_location(remote) - ssh_cmd = ssh_commandline(remote_host, remote_user, remote_port) - - if remote_host: - # We need a short timeout here because if 'remote' isn't reachable at - # all, the process will hang until the connection times out. - ssh_cmd += ['-oConnectTimeout=3'] - - ssh_cmd += ['bst-artifact-receive', remote_repo] - - if remote_host: - ssh = subprocess.Popen(ssh_cmd, stdin=subprocess.PIPE, - stdout=subprocess.PIPE, stderr=subprocess.PIPE) - else: - ssh_cmd += ['--pull-url', remote_repo] - ssh = ProcessWithPipes(receive_main, ssh_cmd[1:]) - - writer = PushMessageWriter(ssh.stdin) - reader = PushMessageReader(ssh.stdout) - - try: - writer.send_hello() - args = reader.receive_info() - writer.send_done() - - if 'pull_url' in args: - pull_url = args['pull_url'] - else: - raise PushException( - "Remote cache did not tell us its pull URL. This cache probably " - "requires updating to a newer version of `bst-artifact-receive`.") - except PushException as protocol_error: - # If we get a read error on the wire, let's first see if SSH reported - # an error such as 'Permission denied'. If so this will be much more - # useful to the user than the "Expected reply, got none" sort of - # message that reader.receive_info() will have raised. - ssh.wait() - if ssh.returncode != 0: - ssh_error = ssh.stderr.read().decode('unicode-escape').strip() - raise PushException("{}".format(ssh_error)) - else: - raise protocol_error - - return pull_url - - -# push() -# -# Run the pusher in process, with logging going to the output file -# -# Args: -# repo: The local repository path -# remote: The ssh remote url to push to -# branches: The refs to push -# output: The output where logging should go -# -# Returns: -# (bool): True if the remote was updated, False if it already existed -# and no updated was required -# -# Raises: -# PushException if there was an error -# -def push(repo, remote, branches, output): - - logging.basicConfig(format='%(module)s: %(levelname)s: %(message)s', - level=logging.INFO, stream=output) - - pusher = OSTreePusher(repo, remote, branches, True, False, output=output) - - def terminate_push(): - pusher.close() - - with _signals.terminator(terminate_push): - try: - pusher.run() - return True - except ConnectionError as e: - # Connection attempt failed or connection was terminated unexpectedly - terminate_push() - raise PushException("Connection failed") from e - except PushException: - terminate_push() - raise - except PushExistsException: - # If the commit already existed, just bail out - # on the push and dont bother re-raising the error - logging.info("Ref {} was already present in remote {}".format(branches, remote)) - terminate_push() - return False - - -@click.command(short_help="Receive pushed artifacts over ssh") -@click.option('--verbose', '-v', is_flag=True, default=False, help="Verbose mode") -@click.option('--debug', '-d', is_flag=True, default=False, help="Debug mode") -@click.option('--pull-url', type=str, required=True, - help="Clients who try to pull over SSH will be redirected here") -@click.argument('repo') -def receive_main(verbose, debug, pull_url, repo): - """A BuildStream sister program for receiving artifacts send to a shared artifact cache - """ - loglevel = logging.WARNING - if verbose: - loglevel = logging.INFO - if debug: - loglevel = logging.DEBUG - logging.basicConfig(format='%(module)s: %(levelname)s: %(message)s', - level=loglevel, stream=sys.stderr) - - receiver = OSTreeReceiver(repo, pull_url) - return receiver.run() diff --git a/buildstream/_ostree.py b/buildstream/_ostree.py index dfa7567de..6fee37dc0 100644 --- a/buildstream/_ostree.py +++ b/buildstream/_ostree.py @@ -27,7 +27,6 @@ # pylint: disable=bad-exception-context,catching-non-exception import os -from collections import namedtuple import gi from gi.repository.GLib import Variant, VariantDict @@ -117,80 +116,6 @@ def checkout(repo, path, commit_, user=False): raise OSTreeError("Failed to checkout commit '{}': {}".format(commit_, e.message)) from e -# commit(): -# -# Commit built artifact to cache. -# -# Files are all recorded with uid/gid 0 -# -# Args: -# repo (OSTree.Repo): The repo -# dir_ (str): The source directory to commit to the repo -# refs (list): A list of symbolic references (tag) for the commit -# -def commit(repo, dir_, refs): - - def commit_filter(repo, path, file_info): - - # For now, just set everything in the repo as uid/gid 0 - # - # In the future we'll want to extract virtualized file - # attributes from a fuse layer and use that. - # - file_info.set_attribute_uint32('unix::uid', 0) - file_info.set_attribute_uint32('unix::gid', 0) - - return OSTree.RepoCommitFilterResult.ALLOW - - commit_modifier = OSTree.RepoCommitModifier.new( - OSTree.RepoCommitModifierFlags.NONE, commit_filter) - - repo.prepare_transaction() - try: - # add tree to repository - mtree = OSTree.MutableTree.new() - repo.write_directory_to_mtree(Gio.File.new_for_path(dir_), - mtree, commit_modifier) - _, root = repo.write_mtree(mtree) - - # create root commit object, no parent, no branch - _, rev = repo.write_commit(None, None, None, None, root) - - # create refs - for ref in refs: - repo.transaction_set_ref(None, ref, rev) - - # complete repo transaction - repo.commit_transaction(None) - except GLib.GError as e: - - # Reraise any error as a buildstream error - repo.abort_transaction() - raise OSTreeError(e.message) from e - - -# set_ref(): -# -# Set symbolic reference to specified revision. -# -# Args: -# repo (OSTree.Repo): The repo -# ref (str): A symbolic reference (tag) for the commit -# rev (str): Commit checksum -# -def set_ref(repo, ref, rev): - - repo.prepare_transaction() - try: - repo.transaction_set_ref(None, ref, rev) - - # complete repo transaction - repo.commit_transaction(None) - except: - repo.abort_transaction() - raise - - # exists(): # # Checks wether a given commit or symbolic ref exists and @@ -244,172 +169,6 @@ def checksum(repo, ref): return checksum_ -OSTREE_GIO_FAST_QUERYINFO = ("standard::name,standard::type,standard::size," - "standard::is-symlink,standard::symlink-target," - "unix::device,unix::inode,unix::mode,unix::uid," - "unix::gid,unix::rdev") - - -DiffItem = namedtuple('DiffItem', ['src', 'src_info', - 'target', 'target_info', - 'src_checksum', 'target_checksum']) - - -# diff_dirs(): -# -# Compute the difference between directory a and b as 3 separate sets -# of OSTree.DiffItem. -# -# This is more-or-less a direct port of OSTree.diff_dirs (which cannot -# be used via PyGobject), but does not support options. -# -# Args: -# a (Gio.File): The first directory for the comparison. -# b (Gio.File): The second directory for the comparison. -# -# Returns: -# (modified, removed, added) -# -def diff_dirs(a, b): - # get_file_checksum(): - # - # Helper to compute the checksum of an arbitrary file (different - # objects have different methods to compute these). - # - def get_file_checksum(f, f_info): - if isinstance(f, OSTree.RepoFile): - return f.get_checksum() - else: - contents = None - if f_info.get_file_type() == Gio.FileType.REGULAR: - contents = f.read() - - csum = OSTree.checksum_file_from_input(f_info, None, contents, - OSTree.ObjectType.FILE) - return OSTree.checksum_from_bytes(csum) - - # diff_files(): - # - # Helper to compute a diff between two files. - # - def diff_files(a, a_info, b, b_info): - checksum_a = get_file_checksum(a, a_info) - checksum_b = get_file_checksum(b, b_info) - - if checksum_a != checksum_b: - return DiffItem(a, a_info, b, b_info, checksum_a, checksum_b) - - return None - - # diff_add_dir_recurse(): - # - # Helper to collect all files in a directory recursively. - # - def diff_add_dir_recurse(d): - added = [] - - dir_enum = d.enumerate_children(OSTREE_GIO_FAST_QUERYINFO, - Gio.FileQueryInfoFlags.NOFOLLOW_SYMLINKS) - - for child_info in dir_enum: - name = child_info.get_name() - child = d.get_child(name) - added.append(child) - - if child_info.get_file_type() == Gio.FileType.DIRECTORY: - added.extend(diff_add_dir_recurse(child)) - - return added - - modified = [] - removed = [] - added = [] - - child_a_info = a.query_info(OSTREE_GIO_FAST_QUERYINFO, - Gio.FileQueryInfoFlags.NOFOLLOW_SYMLINKS) - child_b_info = b.query_info(OSTREE_GIO_FAST_QUERYINFO, - Gio.FileQueryInfoFlags.NOFOLLOW_SYMLINKS) - - # If both are directories and have the same checksum, we know that - # none of the underlying files changed, so we can save time. - if (child_a_info.get_file_type() == Gio.FileType.DIRECTORY and - child_b_info.get_file_type() == Gio.FileType.DIRECTORY and - isinstance(a, OSTree.RepoFileClass) and - isinstance(b, OSTree.RepoFileClass)): - if a.tree_get_contents_checksum() == b.tree_get_contents_checksum(): - return modified, removed, added - - # We walk through 'a' first - dir_enum = a.enumerate_children(OSTREE_GIO_FAST_QUERYINFO, - Gio.FileQueryInfoFlags.NOFOLLOW_SYMLINKS) - for child_a_info in dir_enum: - name = child_a_info.get_name() - - child_a = a.get_child(name) - child_a_type = child_a_info.get_file_type() - - try: - child_b = b.get_child(name) - child_b_info = child_b.query_info(OSTREE_GIO_FAST_QUERYINFO, - Gio.FileQueryInfoFlags.NOFOLLOW_SYMLINKS) - except GLib.Error as e: - # If the file does not exist in b, it has been removed - if e.matches(Gio.io_error_quark(), Gio.IOErrorEnum.NOT_FOUND): - removed.append(child_a) - continue - else: - raise - - # If the files differ but are of different types, we report a - # modification, saving a bit of time because we won't need a - # checksum - child_b_type = child_b_info.get_file_type() - if child_a_type != child_b_type: - diff_item = DiffItem(child_a, child_a_info, - child_b, child_b_info, - None, None) - modified.append(diff_item) - # Finally, we compute checksums and compare the file contents directly - else: - diff_item = diff_files(child_a, child_a_info, child_b, child_b_info) - - if diff_item: - modified.append(diff_item) - - # If the files are both directories, we recursively use - # this function to find differences - saving time if they - # are equal. - if child_a_type == Gio.FileType.DIRECTORY: - subdir = diff_dirs(child_a, child_b) - modified.extend(subdir[0]) - removed.extend(subdir[1]) - added.extend(subdir[2]) - - # Now we walk through 'b' to find any files that were added - dir_enum = b.enumerate_children(OSTREE_GIO_FAST_QUERYINFO, - Gio.FileQueryInfoFlags.NOFOLLOW_SYMLINKS) - for child_b_info in dir_enum: - name = child_b_info.get_name() - - child_b = b.get_child(name) - - try: - child_a = a.get_child(name) - child_a_info = child_a.query_info(OSTREE_GIO_FAST_QUERYINFO, - Gio.FileQueryInfoFlags.NOFOLLOW_SYMLINKS) - except GLib.Error as e: - # If the file does not exist in 'a', it was added. - if e.matches(Gio.io_error_quark(), Gio.IOErrorEnum.NOT_FOUND): - added.append(child_b) - if child_b_info.get_file_type() == Gio.FileType.DIRECTORY: - added.extend(diff_add_dir_recurse(child_b)) - continue - else: - raise - - return modified, removed, added - - # fetch() # # Fetch new objects from a remote, if configured @@ -82,47 +82,6 @@ def assert_bwrap(): exit_bwrap("Bubblewrap too old") -################################################################## -# OSTree version requirements -################################################################## -REQUIRED_OSTREE_YEAR = 2017 -REQUIRED_OSTREE_RELEASE = 8 - - -def exit_ostree(reason): - print(reason + - "\nBuildStream requires OSTree >= v{}.{} with Python bindings. " - .format(REQUIRED_OSTREE_YEAR, REQUIRED_OSTREE_RELEASE) + - "Install it using your package manager (usually ostree or gir1.2-ostree-1.0).") - sys.exit(1) - - -def assert_ostree_version(): - platform = os.environ.get('BST_FORCE_BACKEND', '') or sys.platform - if platform.startswith('linux'): - try: - import gi - except ImportError: - print("BuildStream requires PyGObject (aka PyGI). Install it using" - " your package manager (usually pygobject3 or python-gi).") - sys.exit(1) - - try: - gi.require_version('OSTree', '1.0') - from gi.repository import OSTree - except ValueError: - exit_ostree("OSTree not found") - - try: - if OSTree.YEAR_VERSION < REQUIRED_OSTREE_YEAR or \ - (OSTree.YEAR_VERSION == REQUIRED_OSTREE_YEAR and - OSTree.RELEASE_VERSION < REQUIRED_OSTREE_RELEASE): - exit_ostree("OSTree v{}.{} is too old." - .format(OSTree.YEAR_VERSION, OSTree.RELEASE_VERSION)) - except AttributeError: - exit_ostree("OSTree is too old.") - - ########################################### # List the pre-built man pages to install # ########################################### @@ -154,14 +113,12 @@ def list_man_pages(): # So screw it, lets just use an env var. bst_install_entry_points = { 'console_scripts': [ - 'bst-artifact-receive = buildstream._artifactcache.pushreceive:receive_main', 'bst-artifact-server = buildstream._artifactcache.casserver:server_main' ], } if not os.environ.get('BST_ARTIFACTS_ONLY', ''): assert_bwrap() - assert_ostree_version() bst_install_entry_points['console_scripts'] += [ 'bst = buildstream._frontend:cli' ] |