#
# Copyright (C) 2019 Bloomberg Finance LP
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see .
#
# Authors:
# Raoul Hidalgo Charman
#
# Pylint doesn't play well with fixtures and dependency injection from pytest
# pylint: disable=redefined-outer-name
from contextlib import contextmanager
import os
import shutil
import pytest
from buildstream._exceptions import ErrorDomain
from buildstream._project import Project
from buildstream import _yaml
from buildstream.testing import cli # pylint: disable=unused-import
from buildstream.testing import create_repo
from tests.testutils import create_artifact_share, dummy_context
DATA_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "project")
def move_local_cas_to_remote_source_share(local, remote):
shutil.rmtree(os.path.join(remote, 'repo', 'cas'))
shutil.move(os.path.join(local, 'source_protos'), os.path.join(remote, 'repo'))
shutil.move(os.path.join(local, 'cas'), os.path.join(remote, 'repo'))
shutil.rmtree(os.path.join(local, 'sources'))
shutil.rmtree(os.path.join(local, 'artifacts'))
def create_test_element(tmpdir, project_dir):
repo = create_repo('git', str(tmpdir))
ref = repo.create(os.path.join(project_dir, 'files'))
element_path = os.path.join(project_dir, 'elements')
element_name = 'fetch.bst'
element = {
'kind': 'import',
'sources': [repo.source_config(ref=ref)]
}
_yaml.roundtrip_dump(element, os.path.join(element_path, element_name))
return element_name, repo, ref
@contextmanager
def context_with_source_cache(cli, cache, share, tmpdir):
user_config_file = str(tmpdir.join('buildstream.conf'))
user_config = {
'scheduler': {
'pushers': 1
},
'source-caches': {
'url': share.repo,
},
'cachedir': cache,
}
_yaml.roundtrip_dump(user_config, file=user_config_file)
cli.configure(user_config)
with dummy_context(config=user_config_file) as context:
yield context
@pytest.mark.datafiles(DATA_DIR)
def test_source_fetch(cli, tmpdir, datafiles):
project_dir = str(datafiles)
element_name, _repo, _ref = create_test_element(tmpdir, project_dir)
cache_dir = os.path.join(str(tmpdir), 'cache')
# use artifact cache for sources for now, they should work the same
with create_artifact_share(os.path.join(str(tmpdir), 'sourceshare')) as share:
with context_with_source_cache(cli, cache_dir, share, tmpdir) as context:
project = Project(project_dir, context)
project.ensure_fully_loaded()
element = project.load_elements([element_name])[0]
assert not element._source_cached()
source = list(element.sources())[0]
cas = context.get_cascache()
assert not cas.contains(source._get_source_name())
# Just check that we sensibly fetch and build the element
res = cli.run(project=project_dir, args=['build', element_name])
res.assert_success()
assert os.listdir(os.path.join(str(tmpdir), 'cache', 'sources', 'git')) != []
# get root digest of source
sourcecache = context.sourcecache
digest = sourcecache.export(source)._get_digest()
move_local_cas_to_remote_source_share(str(cache_dir), share.directory)
# check the share has the object
assert share.has_object(digest)
state = cli.get_element_state(project_dir, element_name)
assert state == 'fetch needed'
# Now fetch the source and check
res = cli.run(project=project_dir, args=['source', 'fetch', element_name])
res.assert_success()
assert "Pulled source" in res.stderr
# check that we have the source in the cas now and it's not fetched
assert element._source_cached()
assert os.listdir(os.path.join(str(tmpdir), 'cache', 'sources', 'git')) == []
@pytest.mark.datafiles(DATA_DIR)
def test_fetch_fallback(cli, tmpdir, datafiles):
project_dir = str(datafiles)
element_name, repo, ref = create_test_element(tmpdir, project_dir)
cache_dir = os.path.join(str(tmpdir), 'cache')
# use artifact cache for sources for now, they should work the same
with create_artifact_share(os.path.join(str(tmpdir), 'sourceshare')) as share:
with context_with_source_cache(cli, cache_dir, share, tmpdir) as context:
project = Project(project_dir, context)
project.ensure_fully_loaded()
element = project.load_elements([element_name])[0]
assert not element._source_cached()
source = list(element.sources())[0]
cas = context.get_cascache()
assert not cas.contains(source._get_source_name())
assert not os.path.exists(os.path.join(cache_dir, 'sources'))
# Now check if it falls back to the source fetch method.
res = cli.run(project=project_dir, args=['source', 'fetch', element_name])
res.assert_success()
brief_key = source._get_brief_display_key()
assert ("Remote source service ({}) does not have source {} cached"
.format(share.repo, brief_key)) in res.stderr
assert ("SUCCESS Fetching from {}"
.format(repo.source_config(ref=ref)['url'])) in res.stderr
# Check that the source in both in the source dir and the local CAS
assert element._source_cached()
@pytest.mark.datafiles(DATA_DIR)
def test_pull_fail(cli, tmpdir, datafiles):
project_dir = str(datafiles)
element_name, repo, _ref = create_test_element(tmpdir, project_dir)
cache_dir = os.path.join(str(tmpdir), 'cache')
with create_artifact_share(os.path.join(str(tmpdir), 'sourceshare')) as share:
with context_with_source_cache(cli, cache_dir, share, tmpdir) as context:
project = Project(project_dir, context)
project.ensure_fully_loaded()
element = project.load_elements([element_name])[0]
assert not element._source_cached()
source = list(element.sources())[0]
# remove files and check that it doesn't build
shutil.rmtree(repo.repo)
# Should fail in stream, with a plugin task causing the error
res = cli.run(project=project_dir, args=['build', element_name])
res.assert_main_error(ErrorDomain.STREAM, None)
res.assert_task_error(ErrorDomain.PLUGIN, None)
assert "Remote source service ({}) does not have source {} cached".format(
share.repo, source._get_brief_display_key()) in res.stderr
@pytest.mark.datafiles(DATA_DIR)
def test_source_pull_partial_fallback_fetch(cli, tmpdir, datafiles):
project_dir = str(datafiles)
element_name, repo, ref = create_test_element(tmpdir, project_dir)
cache_dir = os.path.join(str(tmpdir), 'cache')
# use artifact cache for sources for now, they should work the same
with create_artifact_share(os.path.join(str(tmpdir), 'sourceshare')) as share:
with context_with_source_cache(cli, cache_dir, share, tmpdir) as context:
project = Project(project_dir, context)
project.ensure_fully_loaded()
element = project.load_elements([element_name])[0]
assert not element._source_cached()
source = list(element.sources())[0]
cas = context.get_cascache()
assert not cas.contains(source._get_source_name())
# Just check that we sensibly fetch and build the element
res = cli.run(project=project_dir, args=['build', element_name])
res.assert_success()
assert os.listdir(os.path.join(str(tmpdir), 'cache', 'sources', 'git')) != []
# get root digest of source
sourcecache = context.sourcecache
digest = sourcecache.export(source)._get_digest()
move_local_cas_to_remote_source_share(str(cache_dir), share.directory)
# Remove the cas content, only keep the proto and such around
shutil.rmtree(os.path.join(str(tmpdir), "sourceshare", "repo", "cas", "objects"))
# check the share doesn't have the object
assert not share.has_object(digest)
state = cli.get_element_state(project_dir, element_name)
assert state == 'fetch needed'
# Now fetch the source and check
res = cli.run(project=project_dir, args=['source', 'fetch', element_name])
res.assert_success()
assert ("SUCCESS Fetching from {}"
.format(repo.source_config(ref=ref)['url'])) in res.stderr