summaryrefslogtreecommitdiff
path: root/src/buildstream/testing/runcli.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/buildstream/testing/runcli.py')
-rw-r--r--src/buildstream/testing/runcli.py883
1 files changed, 883 insertions, 0 deletions
diff --git a/src/buildstream/testing/runcli.py b/src/buildstream/testing/runcli.py
new file mode 100644
index 000000000..8b3185143
--- /dev/null
+++ b/src/buildstream/testing/runcli.py
@@ -0,0 +1,883 @@
+#
+# Copyright (C) 2017 Codethink Limited
+# Copyright (C) 2018 Bloomberg Finance LP
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+"""
+runcli - Test fixtures used for running BuildStream commands
+============================================================
+
+:function:'cli' Use result = cli.run([arg1, arg2]) to run buildstream commands
+
+:function:'cli_integration' A variant of the main fixture that keeps persistent
+ artifact and source caches. It also does not use
+ the click test runner to avoid deadlock issues when
+ running `bst shell`, but unfortunately cannot produce
+ nice stacktraces.
+
+"""
+
+
+import os
+import re
+import sys
+import shutil
+import tempfile
+import itertools
+import traceback
+from contextlib import contextmanager, ExitStack
+from ruamel import yaml
+import pytest
+
+# XXX Using pytest private internals here
+#
+# We use pytest internals to capture the stdout/stderr during
+# a run of the buildstream CLI. We do this because click's
+# CliRunner convenience API (click.testing module) does not support
+# separation of stdout/stderr.
+#
+from _pytest.capture import MultiCapture, FDCapture, FDCaptureBinary
+
+# Import the main cli entrypoint
+from buildstream._frontend import cli as bst_cli
+from buildstream import _yaml
+from buildstream._cas import CASCache
+from buildstream.element import _get_normal_name, _compose_artifact_name
+
+# Special private exception accessor, for test case purposes
+from buildstream._exceptions import BstError, get_last_exception, get_last_task_error
+from buildstream._protos.buildstream.v2 import artifact_pb2
+
+
+# Wrapper for the click.testing result
+class Result():
+
+ def __init__(self,
+ exit_code=None,
+ exception=None,
+ exc_info=None,
+ output=None,
+ stderr=None):
+ self.exit_code = exit_code
+ self.exc = exception
+ self.exc_info = exc_info
+ self.output = output
+ self.stderr = stderr
+ self.unhandled_exception = False
+
+ # The last exception/error state is stored at exception
+ # creation time in BstError(), but this breaks down with
+ # recoverable errors where code blocks ignore some errors
+ # and fallback to alternative branches.
+ #
+ # For this reason, we just ignore the exception and errors
+ # in the case that the exit code reported is 0 (success).
+ #
+ if self.exit_code != 0:
+
+ # Check if buildstream failed to handle an
+ # exception, topevel CLI exit should always
+ # be a SystemExit exception.
+ #
+ if not isinstance(exception, SystemExit):
+ self.unhandled_exception = True
+
+ self.exception = get_last_exception()
+ self.task_error_domain, \
+ self.task_error_reason = get_last_task_error()
+ else:
+ self.exception = None
+ self.task_error_domain = None
+ self.task_error_reason = None
+
+ # assert_success()
+ #
+ # Asserts that the buildstream session completed successfully
+ #
+ # Args:
+ # fail_message (str): An optional message to override the automatic
+ # assertion error messages
+ # Raises:
+ # (AssertionError): If the session did not complete successfully
+ #
+ def assert_success(self, fail_message=''):
+ assert self.exit_code == 0, fail_message
+ assert self.exc is None, fail_message
+ assert self.exception is None, fail_message
+ assert self.unhandled_exception is False
+
+ # assert_main_error()
+ #
+ # Asserts that the buildstream session failed, and that
+ # the main process error report is as expected
+ #
+ # Args:
+ # error_domain (ErrorDomain): The domain of the error which occurred
+ # error_reason (any): The reason field of the error which occurred
+ # fail_message (str): An optional message to override the automatic
+ # assertion error messages
+ # debug (bool): If true, prints information regarding the exit state of the result()
+ # Raises:
+ # (AssertionError): If any of the assertions fail
+ #
+ def assert_main_error(self,
+ error_domain,
+ error_reason,
+ fail_message='',
+ *, debug=False):
+ if debug:
+ print(
+ """
+ Exit code: {}
+ Exception: {}
+ Domain: {}
+ Reason: {}
+ """.format(
+ self.exit_code,
+ self.exception,
+ self.exception.domain,
+ self.exception.reason
+ ))
+ assert self.exit_code == -1, fail_message
+ assert self.exc is not None, fail_message
+ assert self.exception is not None, fail_message
+ assert isinstance(self.exception, BstError), fail_message
+ assert self.unhandled_exception is False
+
+ assert self.exception.domain == error_domain, fail_message
+ assert self.exception.reason == error_reason, fail_message
+
+ # assert_task_error()
+ #
+ # Asserts that the buildstream session failed, and that
+ # the child task error which caused buildstream to exit
+ # is as expected.
+ #
+ # Args:
+ # error_domain (ErrorDomain): The domain of the error which occurred
+ # error_reason (any): The reason field of the error which occurred
+ # fail_message (str): An optional message to override the automatic
+ # assertion error messages
+ # Raises:
+ # (AssertionError): If any of the assertions fail
+ #
+ def assert_task_error(self,
+ error_domain,
+ error_reason,
+ fail_message=''):
+
+ assert self.exit_code == -1, fail_message
+ assert self.exc is not None, fail_message
+ assert self.exception is not None, fail_message
+ assert isinstance(self.exception, BstError), fail_message
+ assert self.unhandled_exception is False
+
+ assert self.task_error_domain == error_domain, fail_message
+ assert self.task_error_reason == error_reason, fail_message
+
+ # assert_shell_error()
+ #
+ # Asserts that the buildstream created a shell and that the task in the
+ # shell failed.
+ #
+ # Args:
+ # fail_message (str): An optional message to override the automatic
+ # assertion error messages
+ # Raises:
+ # (AssertionError): If any of the assertions fail
+ #
+ def assert_shell_error(self, fail_message=''):
+ assert self.exit_code == 1, fail_message
+
+ # get_start_order()
+ #
+ # Gets the list of elements processed in a given queue, in the
+ # order of their first appearances in the session.
+ #
+ # Args:
+ # activity (str): The queue activity name (like 'fetch')
+ #
+ # Returns:
+ # (list): A list of element names in the order which they first appeared in the result
+ #
+ def get_start_order(self, activity):
+ results = re.findall(r'\[\s*{}:(\S+)\s*\]\s*START\s*.*\.log'.format(activity), self.stderr)
+ if results is None:
+ return []
+ return list(results)
+
+ # get_tracked_elements()
+ #
+ # Produces a list of element names on which tracking occurred
+ # during the session.
+ #
+ # This is done by parsing the buildstream stderr log
+ #
+ # Returns:
+ # (list): A list of element names
+ #
+ def get_tracked_elements(self):
+ tracked = re.findall(r'\[\s*track:(\S+)\s*]', self.stderr)
+ if tracked is None:
+ return []
+
+ return list(tracked)
+
+ def get_pushed_elements(self):
+ pushed = re.findall(r'\[\s*push:(\S+)\s*\]\s*INFO\s*Pushed artifact', self.stderr)
+ if pushed is None:
+ return []
+
+ return list(pushed)
+
+ def get_pulled_elements(self):
+ pulled = re.findall(r'\[\s*pull:(\S+)\s*\]\s*INFO\s*Pulled artifact', self.stderr)
+ if pulled is None:
+ return []
+
+ return list(pulled)
+
+
+class Cli():
+
+ def __init__(self, directory, verbose=True, default_options=None):
+ self.directory = directory
+ self.config = None
+ self.verbose = verbose
+ self.artifact = TestArtifact()
+
+ if default_options is None:
+ default_options = []
+
+ self.default_options = default_options
+
+ # configure():
+ #
+ # Serializes a user configuration into a buildstream.conf
+ # to use for this test cli.
+ #
+ # Args:
+ # config (dict): The user configuration to use
+ #
+ def configure(self, config):
+ if self.config is None:
+ self.config = {}
+
+ for key, val in config.items():
+ self.config[key] = val
+
+ # remove_artifact_from_cache():
+ #
+ # Remove given element artifact from artifact cache
+ #
+ # Args:
+ # project (str): The project path under test
+ # element_name (str): The name of the element artifact
+ # cache_dir (str): Specific cache dir to remove artifact from
+ #
+ def remove_artifact_from_cache(self, project, element_name,
+ *, cache_dir=None):
+ # Read configuration to figure out where artifacts are stored
+ if not cache_dir:
+ default = os.path.join(project, 'cache')
+
+ if self.config is not None:
+ cache_dir = self.config.get('cachedir', default)
+ else:
+ cache_dir = default
+
+ self.artifact.remove_artifact_from_cache(cache_dir, element_name)
+
+ # run():
+ #
+ # Runs buildstream with the given arguments, additionally
+ # also passes some global options to buildstream in order
+ # to stay contained in the testing environment.
+ #
+ # Args:
+ # configure (bool): Whether to pass a --config argument
+ # project (str): An optional path to a project
+ # silent (bool): Whether to pass --no-verbose
+ # env (dict): Environment variables to temporarily set during the test
+ # args (list): A list of arguments to pass buildstream
+ # binary_capture (bool): Whether to capture the stdout/stderr as binary
+ #
+ def run(self, configure=True, project=None, silent=False, env=None,
+ cwd=None, options=None, args=None, binary_capture=False):
+ if args is None:
+ args = []
+ if options is None:
+ options = []
+
+ # We may have been passed e.g. pathlib.Path or py.path
+ args = [str(x) for x in args]
+ project = str(project)
+
+ options = self.default_options + options
+
+ with ExitStack() as stack:
+ bst_args = ['--no-colors']
+
+ if silent:
+ bst_args += ['--no-verbose']
+
+ if configure:
+ config_file = stack.enter_context(
+ configured(self.directory, self.config)
+ )
+ bst_args += ['--config', config_file]
+
+ if project:
+ bst_args += ['--directory', project]
+
+ for option, value in options:
+ bst_args += ['--option', option, value]
+
+ bst_args += args
+
+ if cwd is not None:
+ stack.enter_context(chdir(cwd))
+
+ if env is not None:
+ stack.enter_context(environment(env))
+
+ # Ensure we have a working stdout - required to work
+ # around a bug that appears to cause AIX to close
+ # sys.__stdout__ after setup.py
+ try:
+ sys.__stdout__.fileno()
+ except ValueError:
+ sys.__stdout__ = open('/dev/stdout', 'w')
+
+ result = self._invoke(bst_cli, bst_args, binary_capture=binary_capture)
+
+ # Some informative stdout we can observe when anything fails
+ if self.verbose:
+ command = "bst " + " ".join(bst_args)
+ print("BuildStream exited with code {} for invocation:\n\t{}"
+ .format(result.exit_code, command))
+ if result.output:
+ print("Program output was:\n{}".format(result.output))
+ if result.stderr:
+ print("Program stderr was:\n{}".format(result.stderr))
+
+ if result.exc_info and result.exc_info[0] != SystemExit:
+ traceback.print_exception(*result.exc_info)
+
+ return result
+
+ def _invoke(self, cli_object, args=None, binary_capture=False):
+ exc_info = None
+ exception = None
+ exit_code = 0
+
+ # Temporarily redirect sys.stdin to /dev/null to ensure that
+ # Popen doesn't attempt to read pytest's dummy stdin.
+ old_stdin = sys.stdin
+ with open(os.devnull) as devnull:
+ sys.stdin = devnull
+ capture_kind = FDCaptureBinary if binary_capture else FDCapture
+ capture = MultiCapture(out=True, err=True, in_=False, Capture=capture_kind)
+ capture.start_capturing()
+
+ try:
+ cli_object.main(args=args or (), prog_name=cli_object.name)
+ except SystemExit as e:
+ if e.code != 0:
+ exception = e
+
+ exc_info = sys.exc_info()
+
+ exit_code = e.code
+ if not isinstance(exit_code, int):
+ sys.stdout.write('Program exit code was not an integer: ')
+ sys.stdout.write(str(exit_code))
+ sys.stdout.write('\n')
+ exit_code = 1
+ except Exception as e: # pylint: disable=broad-except
+ exception = e
+ exit_code = -1
+ exc_info = sys.exc_info()
+ finally:
+ sys.stdout.flush()
+
+ sys.stdin = old_stdin
+ out, err = capture.readouterr()
+ capture.stop_capturing()
+
+ return Result(exit_code=exit_code,
+ exception=exception,
+ exc_info=exc_info,
+ output=out,
+ stderr=err)
+
+ # Fetch an element state by name by
+ # invoking bst show on the project with the CLI
+ #
+ # If you need to get the states of multiple elements,
+ # then use get_element_states(s) instead.
+ #
+ def get_element_state(self, project, element_name):
+ result = self.run(project=project, silent=True, args=[
+ 'show',
+ '--deps', 'none',
+ '--format', '%{state}',
+ element_name
+ ])
+ result.assert_success()
+ return result.output.strip()
+
+ # Fetch the states of elements for a given target / deps
+ #
+ # Returns a dictionary with the element names as keys
+ #
+ def get_element_states(self, project, targets, deps='all'):
+ result = self.run(project=project, silent=True, args=[
+ 'show',
+ '--deps', deps,
+ '--format', '%{name}||%{state}',
+ *targets
+ ])
+ result.assert_success()
+ lines = result.output.splitlines()
+ states = {}
+ for line in lines:
+ split = line.split(sep='||')
+ states[split[0]] = split[1]
+ return states
+
+ # Fetch an element's cache key by invoking bst show
+ # on the project with the CLI
+ #
+ def get_element_key(self, project, element_name):
+ result = self.run(project=project, silent=True, args=[
+ 'show',
+ '--deps', 'none',
+ '--format', '%{full-key}',
+ element_name
+ ])
+ result.assert_success()
+ return result.output.strip()
+
+ # Get the decoded config of an element.
+ #
+ def get_element_config(self, project, element_name):
+ result = self.run(project=project, silent=True, args=[
+ 'show',
+ '--deps', 'none',
+ '--format', '%{config}',
+ element_name
+ ])
+
+ result.assert_success()
+ return yaml.safe_load(result.output)
+
+ # Fetch the elements that would be in the pipeline with the given
+ # arguments.
+ #
+ def get_pipeline(self, project, elements, except_=None, scope='plan'):
+ if except_ is None:
+ except_ = []
+
+ args = ['show', '--deps', scope, '--format', '%{name}']
+ args += list(itertools.chain.from_iterable(zip(itertools.repeat('--except'), except_)))
+
+ result = self.run(project=project, silent=True, args=args + elements)
+ result.assert_success()
+ return result.output.splitlines()
+
+ # Fetch an element's complete artifact name, cache_key will be generated
+ # if not given.
+ #
+ def get_artifact_name(self, project, project_name, element_name, cache_key=None):
+ if not cache_key:
+ cache_key = self.get_element_key(project, element_name)
+
+ # Replace path separator and chop off the .bst suffix for normal name
+ normal_name = _get_normal_name(element_name)
+ return _compose_artifact_name(project_name, normal_name, cache_key)
+
+
+class CliIntegration(Cli):
+
+ # run()
+ #
+ # This supports the same arguments as Cli.run() and additionally
+ # it supports the project_config keyword argument.
+ #
+ # This will first load the project.conf file from the specified
+ # project directory ('project' keyword argument) and perform substitutions
+ # of any {project_dir} specified in the existing project.conf.
+ #
+ # If the project_config parameter is specified, it is expected to
+ # be a dictionary of additional project configuration options, and
+ # will be composited on top of the already loaded project.conf
+ #
+ def run(self, *args, project_config=None, **kwargs):
+
+ # First load the project.conf and substitute {project_dir}
+ #
+ # Save the original project.conf, because we will run more than
+ # once in the same temp directory
+ #
+ project_directory = kwargs['project']
+ project_filename = os.path.join(project_directory, 'project.conf')
+ project_backup = os.path.join(project_directory, 'project.conf.backup')
+ project_load_filename = project_filename
+
+ if not os.path.exists(project_backup):
+ shutil.copy(project_filename, project_backup)
+ else:
+ project_load_filename = project_backup
+
+ with open(project_load_filename) as f:
+ config = f.read()
+ config = config.format(project_dir=project_directory)
+
+ if project_config is not None:
+
+ # If a custom project configuration dictionary was
+ # specified, composite it on top of the already
+ # substituted base project configuration
+ #
+ base_config = _yaml.load_data(config)
+
+ # In order to leverage _yaml.composite_dict(), both
+ # dictionaries need to be loaded via _yaml.load_data() first
+ #
+ with tempfile.TemporaryDirectory(dir=project_directory) as scratchdir:
+
+ temp_project = os.path.join(scratchdir, 'project.conf')
+ with open(temp_project, 'w') as f:
+ yaml.safe_dump(project_config, f)
+
+ project_config = _yaml.load(temp_project)
+
+ _yaml.composite_dict(base_config, project_config)
+
+ base_config = _yaml.node_sanitize(base_config)
+ _yaml.dump(base_config, project_filename)
+
+ else:
+
+ # Otherwise, just dump it as is
+ with open(project_filename, 'w') as f:
+ f.write(config)
+
+ return super().run(*args, **kwargs)
+
+
+class CliRemote(CliIntegration):
+
+ # ensure_services():
+ #
+ # Make sure that required services are configured and that
+ # non-required ones are not.
+ #
+ # Args:
+ # actions (bool): Whether to use the 'action-cache' service
+ # artifacts (bool): Whether to use the 'artifact-cache' service
+ # execution (bool): Whether to use the 'execution' service
+ # sources (bool): Whether to use the 'source-cache' service
+ # storage (bool): Whether to use the 'storage' service
+ #
+ # Returns a list of configured services (by names).
+ #
+ def ensure_services(self, actions=True, execution=True, storage=True,
+ artifacts=False, sources=False):
+ # Build a list of configured services by name:
+ configured_services = []
+ if not self.config:
+ return configured_services
+
+ if 'remote-execution' in self.config:
+ rexec_config = self.config['remote-execution']
+
+ if 'action-cache-service' in rexec_config:
+ if actions:
+ configured_services.append('action-cache')
+ else:
+ rexec_config.pop('action-cache-service')
+
+ if 'execution-service' in rexec_config:
+ if execution:
+ configured_services.append('execution')
+ else:
+ rexec_config.pop('execution-service')
+
+ if 'storage-service' in rexec_config:
+ if storage:
+ configured_services.append('storage')
+ else:
+ rexec_config.pop('storage-service')
+
+ if 'artifacts' in self.config:
+ if artifacts:
+ configured_services.append('artifact-cache')
+ else:
+ self.config.pop('artifacts')
+
+ if 'source-caches' in self.config:
+ if sources:
+ configured_services.append('source-cache')
+ else:
+ self.config.pop('source-caches')
+
+ return configured_services
+
+
+class TestArtifact():
+
+ # remove_artifact_from_cache():
+ #
+ # Remove given element artifact from artifact cache
+ #
+ # Args:
+ # cache_dir (str): Specific cache dir to remove artifact from
+ # element_name (str): The name of the element artifact
+ #
+ def remove_artifact_from_cache(self, cache_dir, element_name):
+
+ cache_dir = os.path.join(cache_dir, 'artifacts', 'refs')
+
+ normal_name = element_name.replace(os.sep, '-')
+ cache_dir = os.path.splitext(os.path.join(cache_dir, 'test', normal_name))[0]
+ shutil.rmtree(cache_dir)
+
+ # is_cached():
+ #
+ # Check if given element has a cached artifact
+ #
+ # Args:
+ # cache_dir (str): Specific cache dir to check
+ # element (Element): The element object
+ # element_key (str): The element's cache key
+ #
+ # Returns:
+ # (bool): If the cache contains the element's artifact
+ #
+ def is_cached(self, cache_dir, element, element_key):
+
+ # cas = CASCache(str(cache_dir))
+ artifact_ref = element.get_artifact_name(element_key)
+ return os.path.exists(os.path.join(cache_dir, 'artifacts', 'refs', artifact_ref))
+
+ # get_digest():
+ #
+ # Get the digest for a given element's artifact files
+ #
+ # Args:
+ # cache_dir (str): Specific cache dir to check
+ # element (Element): The element object
+ # element_key (str): The element's cache key
+ #
+ # Returns:
+ # (Digest): The digest stored in the ref
+ #
+ def get_digest(self, cache_dir, element, element_key):
+
+ artifact_ref = element.get_artifact_name(element_key)
+ artifact_dir = os.path.join(cache_dir, 'artifacts', 'refs')
+ artifact_proto = artifact_pb2.Artifact()
+ with open(os.path.join(artifact_dir, artifact_ref), 'rb') as f:
+ artifact_proto.ParseFromString(f.read())
+ return artifact_proto.files
+
+ # extract_buildtree():
+ #
+ # Context manager for extracting an elements artifact buildtree for
+ # inspection.
+ #
+ # Args:
+ # tmpdir (LocalPath): pytest fixture for the tests tmp dir
+ # digest (Digest): The element directory digest to extract
+ #
+ # Yields:
+ # (str): path to extracted buildtree directory, does not guarantee
+ # existence.
+ @contextmanager
+ def extract_buildtree(self, cache_dir, tmpdir, ref):
+ artifact = artifact_pb2.Artifact()
+ try:
+ with open(os.path.join(cache_dir, 'artifacts', 'refs', ref), 'rb') as f:
+ artifact.ParseFromString(f.read())
+ except FileNotFoundError:
+ yield None
+ else:
+ if str(artifact.buildtree):
+ with self._extract_subdirectory(tmpdir, artifact.buildtree) as f:
+ yield f
+ else:
+ yield None
+
+ # _extract_subdirectory():
+ #
+ # Context manager for extracting an element artifact for inspection,
+ # providing an expected path for a given subdirectory
+ #
+ # Args:
+ # tmpdir (LocalPath): pytest fixture for the tests tmp dir
+ # digest (Digest): The element directory digest to extract
+ # subdir (str): Subdirectory to path
+ #
+ # Yields:
+ # (str): path to extracted subdir directory, does not guarantee
+ # existence.
+ @contextmanager
+ def _extract_subdirectory(self, tmpdir, digest):
+ with tempfile.TemporaryDirectory() as extractdir:
+ try:
+ cas = CASCache(str(tmpdir))
+ cas.checkout(extractdir, digest)
+ yield extractdir
+ except FileNotFoundError:
+ yield None
+
+
+# Main fixture
+#
+# Use result = cli.run([arg1, arg2]) to run buildstream commands
+#
+@pytest.fixture()
+def cli(tmpdir):
+ directory = os.path.join(str(tmpdir), 'cache')
+ os.makedirs(directory)
+ return Cli(directory)
+
+
+# A variant of the main fixture that keeps persistent artifact and
+# source caches.
+#
+# It also does not use the click test runner to avoid deadlock issues
+# when running `bst shell`, but unfortunately cannot produce nice
+# stacktraces.
+@pytest.fixture()
+def cli_integration(tmpdir, integration_cache):
+ directory = os.path.join(str(tmpdir), 'cache')
+ os.makedirs(directory)
+
+ if os.environ.get('BST_FORCE_BACKEND') == 'unix':
+ fixture = CliIntegration(directory, default_options=[('linux', 'False')])
+ else:
+ fixture = CliIntegration(directory)
+
+ # We want to cache sources for integration tests more permanently,
+ # to avoid downloading the huge base-sdk repeatedly
+ fixture.configure({
+ 'cachedir': integration_cache.cachedir,
+ 'sourcedir': integration_cache.sources,
+ })
+
+ yield fixture
+
+ # remove following folders if necessary
+ try:
+ shutil.rmtree(os.path.join(integration_cache.cachedir, 'build'))
+ except FileNotFoundError:
+ pass
+ try:
+ shutil.rmtree(os.path.join(integration_cache.cachedir, 'tmp'))
+ except FileNotFoundError:
+ pass
+
+
+# A variant of the main fixture that is configured for remote-execution.
+#
+# It also does not use the click test runner to avoid deadlock issues
+# when running `bst shell`, but unfortunately cannot produce nice
+# stacktraces.
+@pytest.fixture()
+def cli_remote_execution(tmpdir, remote_services):
+ directory = os.path.join(str(tmpdir), 'cache')
+ os.makedirs(directory)
+
+ fixture = CliRemote(directory)
+
+ if remote_services.artifact_service:
+ fixture.configure({'artifacts': [{
+ 'url': remote_services.artifact_service,
+ }]})
+
+ remote_execution = {}
+ if remote_services.action_service:
+ remote_execution['action-cache-service'] = {
+ 'url': remote_services.action_service,
+ }
+ if remote_services.exec_service:
+ remote_execution['execution-service'] = {
+ 'url': remote_services.exec_service,
+ }
+ if remote_services.storage_service:
+ remote_execution['storage-service'] = {
+ 'url': remote_services.storage_service,
+ }
+ if remote_execution:
+ fixture.configure({'remote-execution': remote_execution})
+
+ if remote_services.source_service:
+ fixture.configure({'source-caches': [{
+ 'url': remote_services.source_service,
+ }]})
+
+ return fixture
+
+
+@contextmanager
+def chdir(directory):
+ old_dir = os.getcwd()
+ os.chdir(directory)
+ yield
+ os.chdir(old_dir)
+
+
+@contextmanager
+def environment(env):
+
+ old_env = {}
+ for key, value in env.items():
+ old_env[key] = os.environ.get(key)
+ if value is None:
+ os.environ.pop(key, None)
+ else:
+ os.environ[key] = value
+
+ yield
+
+ for key, value in old_env.items():
+ if value is None:
+ os.environ.pop(key, None)
+ else:
+ os.environ[key] = value
+
+
+@contextmanager
+def configured(directory, config=None):
+
+ # Ensure we've at least relocated the caches to a temp directory
+ if not config:
+ config = {}
+
+ if not config.get('sourcedir', False):
+ config['sourcedir'] = os.path.join(directory, 'sources')
+ if not config.get('cachedir', False):
+ config['cachedir'] = directory
+ if not config.get('logdir', False):
+ config['logdir'] = os.path.join(directory, 'logs')
+
+ # Dump it and yield the filename for test scripts to feed it
+ # to buildstream as an artument
+ filename = os.path.join(directory, "buildstream.conf")
+ _yaml.dump(config, filename)
+
+ yield filename