summaryrefslogtreecommitdiff
path: root/tests/sources
diff options
context:
space:
mode:
authorChandan Singh <csingh43@bloomberg.net>2019-11-11 17:07:09 +0000
committerChandan Singh <chandan@chandansingh.net>2019-11-14 21:21:06 +0000
commit122177153b14664a0e4fed85aa4f22b87cfabf56 (patch)
tree032c2e46825af91f6fe27f22b5b567eea2b7935d /tests/sources
parenta3ee349558f36a220f79665873b36c1b0f990c8e (diff)
downloadbuildstream-122177153b14664a0e4fed85aa4f22b87cfabf56.tar.gz
Reformat code using Black
As discussed over the mailing list, reformat code using Black. This is a one-off change to reformat all our codebase. Moving forward, we shouldn't expect such blanket reformats. Rather, we expect each change to already comply with the Black formatting style.
Diffstat (limited to 'tests/sources')
-rw-r--r--tests/sources/bzr.py30
-rw-r--r--tests/sources/deb.py64
-rw-r--r--tests/sources/git.py990
-rw-r--r--tests/sources/keytest.py3
-rw-r--r--tests/sources/local.py165
-rw-r--r--tests/sources/no-fetch-cached/plugins/sources/always_cached.py1
-rw-r--r--tests/sources/no_fetch_cached.py25
-rw-r--r--tests/sources/patch.py93
-rw-r--r--tests/sources/pip.py46
-rw-r--r--tests/sources/previous_source_access.py31
-rw-r--r--tests/sources/previous_source_access/plugins/sources/foo_transform.py26
-rw-r--r--tests/sources/remote.py155
-rw-r--r--tests/sources/tar.py236
-rw-r--r--tests/sources/zip.py109
14 files changed, 834 insertions, 1140 deletions
diff --git a/tests/sources/bzr.py b/tests/sources/bzr.py
index c6e78f8c1..ca727c793 100644
--- a/tests/sources/bzr.py
+++ b/tests/sources/bzr.py
@@ -10,40 +10,32 @@ from buildstream.testing import cli # pylint: disable=unused-import
from buildstream.testing import create_repo
from buildstream.testing._utils.site import HAVE_BZR
-DATA_DIR = os.path.join(
- os.path.dirname(os.path.realpath(__file__)),
- 'bzr'
-)
+DATA_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "bzr")
@pytest.mark.skipif(HAVE_BZR is False, reason="bzr is not available")
@pytest.mark.datafiles(os.path.join(DATA_DIR))
def test_fetch_checkout(cli, tmpdir, datafiles):
project = str(datafiles)
- checkoutdir = os.path.join(str(tmpdir), 'checkout')
+ checkoutdir = os.path.join(str(tmpdir), "checkout")
- repo = create_repo('bzr', str(tmpdir))
- ref = repo.create(os.path.join(project, 'basic'))
+ repo = create_repo("bzr", str(tmpdir))
+ ref = repo.create(os.path.join(project, "basic"))
# Write out our test target
- element = {
- 'kind': 'import',
- 'sources': [
- repo.source_config(ref=ref)
- ]
- }
- _yaml.roundtrip_dump(element, os.path.join(project, 'target.bst'))
+ element = {"kind": "import", "sources": [repo.source_config(ref=ref)]}
+ _yaml.roundtrip_dump(element, os.path.join(project, "target.bst"))
# Fetch, build, checkout
- result = cli.run(project=project, args=['source', 'fetch', 'target.bst'])
+ result = cli.run(project=project, args=["source", "fetch", "target.bst"])
assert result.exit_code == 0
- result = cli.run(project=project, args=['build', 'target.bst'])
+ result = cli.run(project=project, args=["build", "target.bst"])
assert result.exit_code == 0
- result = cli.run(project=project, args=['artifact', 'checkout', 'target.bst', '--directory', checkoutdir])
+ result = cli.run(project=project, args=["artifact", "checkout", "target.bst", "--directory", checkoutdir])
assert result.exit_code == 0
# Assert we checked out the file as it was commited
- with open(os.path.join(checkoutdir, 'test')) as f:
+ with open(os.path.join(checkoutdir, "test")) as f:
text = f.read()
- assert text == 'test\n'
+ assert text == "test\n"
diff --git a/tests/sources/deb.py b/tests/sources/deb.py
index e536e522a..96060a1a4 100644
--- a/tests/sources/deb.py
+++ b/tests/sources/deb.py
@@ -12,22 +12,14 @@ from buildstream.testing import cli # pylint: disable=unused-import
from buildstream.testing._utils.site import HAVE_ARPY
from . import list_dir_contents
-DATA_DIR = os.path.join(
- os.path.dirname(os.path.realpath(__file__)),
- 'deb',
-)
+DATA_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "deb",)
deb_name = "a_deb.deb"
def generate_project(project_dir, tmpdir):
project_file = os.path.join(project_dir, "project.conf")
- _yaml.roundtrip_dump({
- 'name': 'foo',
- 'aliases': {
- 'tmpdir': "file:///" + str(tmpdir)
- }
- }, project_file)
+ _yaml.roundtrip_dump({"name": "foo", "aliases": {"tmpdir": "file:///" + str(tmpdir)}}, project_file)
def _copy_deb(start_location, tmpdir):
@@ -38,31 +30,29 @@ def _copy_deb(start_location, tmpdir):
# Test that without ref, consistency is set appropriately.
@pytest.mark.skipif(HAVE_ARPY is False, reason="arpy is not available")
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'no-ref'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "no-ref"))
def test_no_ref(cli, tmpdir, datafiles):
project = str(datafiles)
generate_project(project, tmpdir)
- assert cli.get_element_state(project, 'target.bst') == 'no reference'
+ assert cli.get_element_state(project, "target.bst") == "no reference"
# Test that when I fetch a nonexistent URL, errors are handled gracefully and a retry is performed.
@pytest.mark.skipif(HAVE_ARPY is False, reason="arpy is not available")
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'fetch'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "fetch"))
def test_fetch_bad_url(cli, tmpdir, datafiles):
project = str(datafiles)
generate_project(project, tmpdir)
# Try to fetch it
- result = cli.run(project=project, args=[
- 'source', 'fetch', 'target.bst'
- ])
+ result = cli.run(project=project, args=["source", "fetch", "target.bst"])
assert "FAILURE Try #" in result.stderr
result.assert_main_error(ErrorDomain.STREAM, None)
result.assert_task_error(ErrorDomain.SOURCE, None)
@pytest.mark.skipif(HAVE_ARPY is False, reason="arpy is not available")
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'fetch'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "fetch"))
def test_fetch_bad_ref(cli, tmpdir, datafiles):
project = str(datafiles)
generate_project(project, tmpdir)
@@ -71,16 +61,14 @@ def test_fetch_bad_ref(cli, tmpdir, datafiles):
_copy_deb(DATA_DIR, tmpdir)
# Try to fetch it
- result = cli.run(project=project, args=[
- 'source', 'fetch', 'target.bst'
- ])
+ result = cli.run(project=project, args=["source", "fetch", "target.bst"])
result.assert_main_error(ErrorDomain.STREAM, None)
result.assert_task_error(ErrorDomain.SOURCE, None)
# Test that when tracking with a ref set, there is a warning
@pytest.mark.skipif(HAVE_ARPY is False, reason="arpy is not available")
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'fetch'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "fetch"))
def test_track_warning(cli, tmpdir, datafiles):
project = str(datafiles)
generate_project(project, tmpdir)
@@ -89,16 +77,14 @@ def test_track_warning(cli, tmpdir, datafiles):
_copy_deb(DATA_DIR, tmpdir)
# Track it
- result = cli.run(project=project, args=[
- 'source', 'track', 'target.bst'
- ])
+ result = cli.run(project=project, args=["source", "track", "target.bst"])
result.assert_success()
assert "Potential man-in-the-middle attack!" in result.stderr
# Test that a staged checkout matches what was tarred up, with the default first subdir
@pytest.mark.skipif(HAVE_ARPY is False, reason="arpy is not available")
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'fetch'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "fetch"))
def test_stage_default_basedir(cli, tmpdir, datafiles):
project = str(datafiles)
generate_project(project, tmpdir)
@@ -108,13 +94,13 @@ def test_stage_default_basedir(cli, tmpdir, datafiles):
_copy_deb(DATA_DIR, tmpdir)
# Track, fetch, build, checkout
- result = cli.run(project=project, args=['source', 'track', 'target.bst'])
+ result = cli.run(project=project, args=["source", "track", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['source', 'fetch', 'target.bst'])
+ result = cli.run(project=project, args=["source", "fetch", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['build', 'target.bst'])
+ result = cli.run(project=project, args=["build", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['artifact', 'checkout', 'target.bst', '--directory', checkoutdir])
+ result = cli.run(project=project, args=["artifact", "checkout", "target.bst", "--directory", checkoutdir])
result.assert_success()
# Check that the content of the first directory is checked out (base-dir: '')
@@ -126,7 +112,7 @@ def test_stage_default_basedir(cli, tmpdir, datafiles):
# Test that a staged checkout matches what was tarred up, with an empty base-dir
@pytest.mark.skipif(HAVE_ARPY is False, reason="arpy is not available")
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'no-basedir'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "no-basedir"))
def test_stage_no_basedir(cli, tmpdir, datafiles):
project = str(datafiles)
generate_project(project, tmpdir)
@@ -136,13 +122,13 @@ def test_stage_no_basedir(cli, tmpdir, datafiles):
_copy_deb(DATA_DIR, tmpdir)
# Track, fetch, build, checkout
- result = cli.run(project=project, args=['source', 'track', 'target.bst'])
+ result = cli.run(project=project, args=["source", "track", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['source', 'fetch', 'target.bst'])
+ result = cli.run(project=project, args=["source", "fetch", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['build', 'target.bst'])
+ result = cli.run(project=project, args=["build", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['artifact', 'checkout', 'target.bst', '--directory', checkoutdir])
+ result = cli.run(project=project, args=["artifact", "checkout", "target.bst", "--directory", checkoutdir])
result.assert_success()
# Check that the full content of the tarball is checked out (base-dir: '')
@@ -154,7 +140,7 @@ def test_stage_no_basedir(cli, tmpdir, datafiles):
# Test that a staged checkout matches what was tarred up, with an explicit basedir
@pytest.mark.skipif(HAVE_ARPY is False, reason="arpy is not available")
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'explicit-basedir'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "explicit-basedir"))
def test_stage_explicit_basedir(cli, tmpdir, datafiles):
project = str(datafiles)
generate_project(project, tmpdir)
@@ -164,13 +150,13 @@ def test_stage_explicit_basedir(cli, tmpdir, datafiles):
_copy_deb(DATA_DIR, tmpdir)
# Track, fetch, build, checkout
- result = cli.run(project=project, args=['source', 'track', 'target.bst'])
+ result = cli.run(project=project, args=["source", "track", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['source', 'fetch', 'target.bst'])
+ result = cli.run(project=project, args=["source", "fetch", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['build', 'target.bst'])
+ result = cli.run(project=project, args=["build", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['artifact', 'checkout', 'target.bst', '--directory', checkoutdir])
+ result = cli.run(project=project, args=["artifact", "checkout", "target.bst", "--directory", checkoutdir])
result.assert_success()
# Check that the content of the first directory is checked out (base-dir: '')
diff --git a/tests/sources/git.py b/tests/sources/git.py
index 245f90131..fbb1394ec 100644
--- a/tests/sources/git.py
+++ b/tests/sources/git.py
@@ -36,354 +36,304 @@ from buildstream.testing import cli # pylint: disable=unused-import
from buildstream.testing import create_repo
from buildstream.testing._utils.site import HAVE_GIT, HAVE_OLD_GIT
-DATA_DIR = os.path.join(
- os.path.dirname(os.path.realpath(__file__)),
- 'git',
-)
+DATA_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "git",)
@pytest.mark.skipif(HAVE_GIT is False, reason="git is not available")
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'template'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "template"))
def test_fetch_bad_ref(cli, tmpdir, datafiles):
project = str(datafiles)
# Create the repo from 'repofiles' subdir
- repo = create_repo('git', str(tmpdir))
- repo.create(os.path.join(project, 'repofiles'))
+ repo = create_repo("git", str(tmpdir))
+ repo.create(os.path.join(project, "repofiles"))
# Write out our test target with a bad ref
- element = {
- 'kind': 'import',
- 'sources': [
- repo.source_config(ref='5')
- ]
- }
- _yaml.roundtrip_dump(element, os.path.join(project, 'target.bst'))
+ element = {"kind": "import", "sources": [repo.source_config(ref="5")]}
+ _yaml.roundtrip_dump(element, os.path.join(project, "target.bst"))
# Assert that fetch raises an error here
- result = cli.run(project=project, args=[
- 'source', 'fetch', 'target.bst'
- ])
+ result = cli.run(project=project, args=["source", "fetch", "target.bst"])
result.assert_main_error(ErrorDomain.STREAM, None)
result.assert_task_error(ErrorDomain.SOURCE, None)
@pytest.mark.skipif(HAVE_GIT is False, reason="git is not available")
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'template'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "template"))
def test_submodule_fetch_checkout(cli, tmpdir, datafiles):
project = str(datafiles)
checkoutdir = os.path.join(str(tmpdir), "checkout")
# Create the submodule first from the 'subrepofiles' subdir
- subrepo = create_repo('git', str(tmpdir), 'subrepo')
- subrepo.create(os.path.join(project, 'subrepofiles'))
+ subrepo = create_repo("git", str(tmpdir), "subrepo")
+ subrepo.create(os.path.join(project, "subrepofiles"))
# Create the repo from 'repofiles' subdir
- repo = create_repo('git', str(tmpdir))
- repo.create(os.path.join(project, 'repofiles'))
+ repo = create_repo("git", str(tmpdir))
+ repo.create(os.path.join(project, "repofiles"))
# Add a submodule pointing to the one we created
- ref = repo.add_submodule('subdir', 'file://' + subrepo.repo)
+ ref = repo.add_submodule("subdir", "file://" + subrepo.repo)
# Write out our test target
- element = {
- 'kind': 'import',
- 'sources': [
- repo.source_config(ref=ref)
- ]
- }
- _yaml.roundtrip_dump(element, os.path.join(project, 'target.bst'))
+ element = {"kind": "import", "sources": [repo.source_config(ref=ref)]}
+ _yaml.roundtrip_dump(element, os.path.join(project, "target.bst"))
# Fetch, build, checkout
- result = cli.run(project=project, args=['source', 'fetch', 'target.bst'])
+ result = cli.run(project=project, args=["source", "fetch", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['build', 'target.bst'])
+ result = cli.run(project=project, args=["build", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['artifact', 'checkout', 'target.bst', '--directory', checkoutdir])
+ result = cli.run(project=project, args=["artifact", "checkout", "target.bst", "--directory", checkoutdir])
result.assert_success()
# Assert we checked out both files at their expected location
- assert os.path.exists(os.path.join(checkoutdir, 'file.txt'))
- assert os.path.exists(os.path.join(checkoutdir, 'subdir', 'ponyfile.txt'))
+ assert os.path.exists(os.path.join(checkoutdir, "file.txt"))
+ assert os.path.exists(os.path.join(checkoutdir, "subdir", "ponyfile.txt"))
@pytest.mark.skipif(HAVE_GIT is False, reason="git is not available")
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'template'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "template"))
def test_submodule_fetch_source_enable_explicit(cli, tmpdir, datafiles):
project = str(datafiles)
checkoutdir = os.path.join(str(tmpdir), "checkout")
# Create the submodule first from the 'subrepofiles' subdir
- subrepo = create_repo('git', str(tmpdir), 'subrepo')
- subrepo.create(os.path.join(project, 'subrepofiles'))
+ subrepo = create_repo("git", str(tmpdir), "subrepo")
+ subrepo.create(os.path.join(project, "subrepofiles"))
# Create the repo from 'repofiles' subdir
- repo = create_repo('git', str(tmpdir))
- repo.create(os.path.join(project, 'repofiles'))
+ repo = create_repo("git", str(tmpdir))
+ repo.create(os.path.join(project, "repofiles"))
# Add a submodule pointing to the one we created
- ref = repo.add_submodule('subdir', 'file://' + subrepo.repo)
+ ref = repo.add_submodule("subdir", "file://" + subrepo.repo)
# Write out our test target
- element = {
- 'kind': 'import',
- 'sources': [
- repo.source_config_extra(ref=ref, checkout_submodules=True)
- ]
- }
- _yaml.roundtrip_dump(element, os.path.join(project, 'target.bst'))
+ element = {"kind": "import", "sources": [repo.source_config_extra(ref=ref, checkout_submodules=True)]}
+ _yaml.roundtrip_dump(element, os.path.join(project, "target.bst"))
# Fetch, build, checkout
- result = cli.run(project=project, args=['source', 'fetch', 'target.bst'])
+ result = cli.run(project=project, args=["source", "fetch", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['build', 'target.bst'])
+ result = cli.run(project=project, args=["build", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['artifact', 'checkout', 'target.bst', '--directory', checkoutdir])
+ result = cli.run(project=project, args=["artifact", "checkout", "target.bst", "--directory", checkoutdir])
result.assert_success()
# Assert we checked out both files at their expected location
- assert os.path.exists(os.path.join(checkoutdir, 'file.txt'))
- assert os.path.exists(os.path.join(checkoutdir, 'subdir', 'ponyfile.txt'))
+ assert os.path.exists(os.path.join(checkoutdir, "file.txt"))
+ assert os.path.exists(os.path.join(checkoutdir, "subdir", "ponyfile.txt"))
@pytest.mark.skipif(HAVE_GIT is False, reason="git is not available")
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'template'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "template"))
def test_submodule_fetch_source_disable(cli, tmpdir, datafiles):
project = str(datafiles)
checkoutdir = os.path.join(str(tmpdir), "checkout")
# Create the submodule first from the 'subrepofiles' subdir
- subrepo = create_repo('git', str(tmpdir), 'subrepo')
- subrepo.create(os.path.join(project, 'subrepofiles'))
+ subrepo = create_repo("git", str(tmpdir), "subrepo")
+ subrepo.create(os.path.join(project, "subrepofiles"))
# Create the repo from 'repofiles' subdir
- repo = create_repo('git', str(tmpdir))
- repo.create(os.path.join(project, 'repofiles'))
+ repo = create_repo("git", str(tmpdir))
+ repo.create(os.path.join(project, "repofiles"))
# Add a submodule pointing to the one we created
- ref = repo.add_submodule('subdir', 'file://' + subrepo.repo)
+ ref = repo.add_submodule("subdir", "file://" + subrepo.repo)
# Write out our test target
- element = {
- 'kind': 'import',
- 'sources': [
- repo.source_config_extra(ref=ref, checkout_submodules=False)
- ]
- }
- _yaml.roundtrip_dump(element, os.path.join(project, 'target.bst'))
+ element = {"kind": "import", "sources": [repo.source_config_extra(ref=ref, checkout_submodules=False)]}
+ _yaml.roundtrip_dump(element, os.path.join(project, "target.bst"))
# Fetch, build, checkout
- result = cli.run(project=project, args=['source', 'fetch', 'target.bst'])
+ result = cli.run(project=project, args=["source", "fetch", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['build', 'target.bst'])
+ result = cli.run(project=project, args=["build", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['artifact', 'checkout', 'target.bst', '--directory', checkoutdir])
+ result = cli.run(project=project, args=["artifact", "checkout", "target.bst", "--directory", checkoutdir])
result.assert_success()
# Assert we checked out both files at their expected location
- assert os.path.exists(os.path.join(checkoutdir, 'file.txt'))
- assert not os.path.exists(os.path.join(checkoutdir, 'subdir', 'ponyfile.txt'))
+ assert os.path.exists(os.path.join(checkoutdir, "file.txt"))
+ assert not os.path.exists(os.path.join(checkoutdir, "subdir", "ponyfile.txt"))
@pytest.mark.skipif(HAVE_GIT is False, reason="git is not available")
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'template'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "template"))
def test_submodule_fetch_submodule_does_override(cli, tmpdir, datafiles):
project = str(datafiles)
checkoutdir = os.path.join(str(tmpdir), "checkout")
# Create the submodule first from the 'subrepofiles' subdir
- subrepo = create_repo('git', str(tmpdir), 'subrepo')
- subrepo.create(os.path.join(project, 'subrepofiles'))
+ subrepo = create_repo("git", str(tmpdir), "subrepo")
+ subrepo.create(os.path.join(project, "subrepofiles"))
# Create the repo from 'repofiles' subdir
- repo = create_repo('git', str(tmpdir))
- repo.create(os.path.join(project, 'repofiles'))
+ repo = create_repo("git", str(tmpdir))
+ repo.create(os.path.join(project, "repofiles"))
# Add a submodule pointing to the one we created
- ref = repo.add_submodule('subdir', 'file://' + subrepo.repo, checkout=True)
+ ref = repo.add_submodule("subdir", "file://" + subrepo.repo, checkout=True)
# Write out our test target
- element = {
- 'kind': 'import',
- 'sources': [
- repo.source_config_extra(ref=ref, checkout_submodules=False)
- ]
- }
- _yaml.roundtrip_dump(element, os.path.join(project, 'target.bst'))
+ element = {"kind": "import", "sources": [repo.source_config_extra(ref=ref, checkout_submodules=False)]}
+ _yaml.roundtrip_dump(element, os.path.join(project, "target.bst"))
# Fetch, build, checkout
- result = cli.run(project=project, args=['source', 'fetch', 'target.bst'])
+ result = cli.run(project=project, args=["source", "fetch", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['build', 'target.bst'])
+ result = cli.run(project=project, args=["build", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['artifact', 'checkout', 'target.bst', '--directory', checkoutdir])
+ result = cli.run(project=project, args=["artifact", "checkout", "target.bst", "--directory", checkoutdir])
result.assert_success()
# Assert we checked out both files at their expected location
- assert os.path.exists(os.path.join(checkoutdir, 'file.txt'))
- assert os.path.exists(os.path.join(checkoutdir, 'subdir', 'ponyfile.txt'))
+ assert os.path.exists(os.path.join(checkoutdir, "file.txt"))
+ assert os.path.exists(os.path.join(checkoutdir, "subdir", "ponyfile.txt"))
@pytest.mark.skipif(HAVE_GIT is False, reason="git is not available")
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'template'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "template"))
def test_submodule_fetch_submodule_individual_checkout(cli, tmpdir, datafiles):
project = str(datafiles)
checkoutdir = os.path.join(str(tmpdir), "checkout")
# Create the submodule first from the 'subrepofiles' subdir
- subrepo = create_repo('git', str(tmpdir), 'subrepo')
- subrepo.create(os.path.join(project, 'subrepofiles'))
+ subrepo = create_repo("git", str(tmpdir), "subrepo")
+ subrepo.create(os.path.join(project, "subrepofiles"))
# Create another submodule from the 'othersubrepofiles' subdir
- other_subrepo = create_repo('git', str(tmpdir), 'othersubrepo')
- other_subrepo.create(os.path.join(project, 'othersubrepofiles'))
+ other_subrepo = create_repo("git", str(tmpdir), "othersubrepo")
+ other_subrepo.create(os.path.join(project, "othersubrepofiles"))
# Create the repo from 'repofiles' subdir
- repo = create_repo('git', str(tmpdir))
- repo.create(os.path.join(project, 'repofiles'))
+ repo = create_repo("git", str(tmpdir))
+ repo.create(os.path.join(project, "repofiles"))
# Add a submodule pointing to the one we created
- repo.add_submodule('subdir', 'file://' + subrepo.repo, checkout=False)
- ref = repo.add_submodule('othersubdir', 'file://' + other_subrepo.repo)
+ repo.add_submodule("subdir", "file://" + subrepo.repo, checkout=False)
+ ref = repo.add_submodule("othersubdir", "file://" + other_subrepo.repo)
# Write out our test target
- element = {
- 'kind': 'import',
- 'sources': [
- repo.source_config_extra(ref=ref, checkout_submodules=True)
- ]
- }
- _yaml.roundtrip_dump(element, os.path.join(project, 'target.bst'))
+ element = {"kind": "import", "sources": [repo.source_config_extra(ref=ref, checkout_submodules=True)]}
+ _yaml.roundtrip_dump(element, os.path.join(project, "target.bst"))
# Fetch, build, checkout
- result = cli.run(project=project, args=['source', 'fetch', 'target.bst'])
+ result = cli.run(project=project, args=["source", "fetch", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['build', 'target.bst'])
+ result = cli.run(project=project, args=["build", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['artifact', 'checkout', 'target.bst', '--directory', checkoutdir])
+ result = cli.run(project=project, args=["artifact", "checkout", "target.bst", "--directory", checkoutdir])
result.assert_success()
# Assert we checked out files at their expected location
- assert os.path.exists(os.path.join(checkoutdir, 'file.txt'))
- assert not os.path.exists(os.path.join(checkoutdir, 'subdir', 'ponyfile.txt'))
- assert os.path.exists(os.path.join(checkoutdir, 'othersubdir', 'unicornfile.txt'))
+ assert os.path.exists(os.path.join(checkoutdir, "file.txt"))
+ assert not os.path.exists(os.path.join(checkoutdir, "subdir", "ponyfile.txt"))
+ assert os.path.exists(os.path.join(checkoutdir, "othersubdir", "unicornfile.txt"))
@pytest.mark.skipif(HAVE_GIT is False, reason="git is not available")
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'template'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "template"))
def test_submodule_fetch_submodule_individual_checkout_explicit(cli, tmpdir, datafiles):
project = str(datafiles)
checkoutdir = os.path.join(str(tmpdir), "checkout")
# Create the submodule first from the 'subrepofiles' subdir
- subrepo = create_repo('git', str(tmpdir), 'subrepo')
- subrepo.create(os.path.join(project, 'subrepofiles'))
+ subrepo = create_repo("git", str(tmpdir), "subrepo")
+ subrepo.create(os.path.join(project, "subrepofiles"))
# Create another submodule from the 'othersubrepofiles' subdir
- other_subrepo = create_repo('git', str(tmpdir), 'othersubrepo')
- other_subrepo.create(os.path.join(project, 'othersubrepofiles'))
+ other_subrepo = create_repo("git", str(tmpdir), "othersubrepo")
+ other_subrepo.create(os.path.join(project, "othersubrepofiles"))
# Create the repo from 'repofiles' subdir
- repo = create_repo('git', str(tmpdir))
- repo.create(os.path.join(project, 'repofiles'))
+ repo = create_repo("git", str(tmpdir))
+ repo.create(os.path.join(project, "repofiles"))
# Add a submodule pointing to the one we created
- repo.add_submodule('subdir', 'file://' + subrepo.repo, checkout=False)
- ref = repo.add_submodule('othersubdir', 'file://' + other_subrepo.repo, checkout=True)
+ repo.add_submodule("subdir", "file://" + subrepo.repo, checkout=False)
+ ref = repo.add_submodule("othersubdir", "file://" + other_subrepo.repo, checkout=True)
# Write out our test target
- element = {
- 'kind': 'import',
- 'sources': [
- repo.source_config_extra(ref=ref, checkout_submodules=True)
- ]
- }
- _yaml.roundtrip_dump(element, os.path.join(project, 'target.bst'))
+ element = {"kind": "import", "sources": [repo.source_config_extra(ref=ref, checkout_submodules=True)]}
+ _yaml.roundtrip_dump(element, os.path.join(project, "target.bst"))
# Fetch, build, checkout
- result = cli.run(project=project, args=['source', 'fetch', 'target.bst'])
+ result = cli.run(project=project, args=["source", "fetch", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['build', 'target.bst'])
+ result = cli.run(project=project, args=["build", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['artifact', 'checkout', 'target.bst', '--directory', checkoutdir])
+ result = cli.run(project=project, args=["artifact", "checkout", "target.bst", "--directory", checkoutdir])
result.assert_success()
# Assert we checked out files at their expected location
- assert os.path.exists(os.path.join(checkoutdir, 'file.txt'))
- assert not os.path.exists(os.path.join(checkoutdir, 'subdir', 'ponyfile.txt'))
- assert os.path.exists(os.path.join(checkoutdir, 'othersubdir', 'unicornfile.txt'))
+ assert os.path.exists(os.path.join(checkoutdir, "file.txt"))
+ assert not os.path.exists(os.path.join(checkoutdir, "subdir", "ponyfile.txt"))
+ assert os.path.exists(os.path.join(checkoutdir, "othersubdir", "unicornfile.txt"))
@pytest.mark.skipif(HAVE_GIT is False, reason="git is not available")
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'project-override'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "project-override"))
def test_submodule_fetch_project_override(cli, tmpdir, datafiles):
project = str(datafiles)
checkoutdir = os.path.join(str(tmpdir), "checkout")
# Create the submodule first from the 'subrepofiles' subdir
- subrepo = create_repo('git', str(tmpdir), 'subrepo')
- subrepo.create(os.path.join(project, 'subrepofiles'))
+ subrepo = create_repo("git", str(tmpdir), "subrepo")
+ subrepo.create(os.path.join(project, "subrepofiles"))
# Create the repo from 'repofiles' subdir
- repo = create_repo('git', str(tmpdir))
- repo.create(os.path.join(project, 'repofiles'))
+ repo = create_repo("git", str(tmpdir))
+ repo.create(os.path.join(project, "repofiles"))
# Add a submodule pointing to the one we created
- ref = repo.add_submodule('subdir', 'file://' + subrepo.repo)
+ ref = repo.add_submodule("subdir", "file://" + subrepo.repo)
# Write out our test target
- element = {
- 'kind': 'import',
- 'sources': [
- repo.source_config(ref=ref)
- ]
- }
- _yaml.roundtrip_dump(element, os.path.join(project, 'target.bst'))
+ element = {"kind": "import", "sources": [repo.source_config(ref=ref)]}
+ _yaml.roundtrip_dump(element, os.path.join(project, "target.bst"))
# Fetch, build, checkout
- result = cli.run(project=project, args=['source', 'fetch', 'target.bst'])
+ result = cli.run(project=project, args=["source", "fetch", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['build', 'target.bst'])
+ result = cli.run(project=project, args=["build", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['artifact', 'checkout', 'target.bst', '--directory', checkoutdir])
+ result = cli.run(project=project, args=["artifact", "checkout", "target.bst", "--directory", checkoutdir])
result.assert_success()
# Assert we checked out both files at their expected location
- assert os.path.exists(os.path.join(checkoutdir, 'file.txt'))
- assert not os.path.exists(os.path.join(checkoutdir, 'subdir', 'ponyfile.txt'))
+ assert os.path.exists(os.path.join(checkoutdir, "file.txt"))
+ assert not os.path.exists(os.path.join(checkoutdir, "subdir", "ponyfile.txt"))
@pytest.mark.skipif(HAVE_GIT is False, reason="git is not available")
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'template'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "template"))
def test_submodule_track_ignore_inconsistent(cli, tmpdir, datafiles):
project = str(datafiles)
# Create the repo from 'repofiles' subdir
- repo = create_repo('git', str(tmpdir))
- ref = repo.create(os.path.join(project, 'repofiles'))
+ repo = create_repo("git", str(tmpdir))
+ ref = repo.create(os.path.join(project, "repofiles"))
# Write out our test target
- element = {
- 'kind': 'import',
- 'sources': [
- repo.source_config(ref=ref)
- ]
- }
- _yaml.roundtrip_dump(element, os.path.join(project, 'target.bst'))
+ element = {"kind": "import", "sources": [repo.source_config(ref=ref)]}
+ _yaml.roundtrip_dump(element, os.path.join(project, "target.bst"))
# Now add a .gitmodules file with an inconsistent submodule,
# we are calling this inconsistent because the file was created
# but `git submodule add` was never called, so there is no reference
# associated to the submodule.
#
- repo.add_file(os.path.join(project, 'inconsistent-submodule', '.gitmodules'))
+ repo.add_file(os.path.join(project, "inconsistent-submodule", ".gitmodules"))
# Fetch should work, we're not yet at the offending ref
- result = cli.run(project=project, args=['source', 'fetch', 'target.bst'])
+ result = cli.run(project=project, args=["source", "fetch", "target.bst"])
result.assert_success()
# Track will encounter an inconsistent submodule without any ref
- result = cli.run(project=project, args=['source', 'track', 'target.bst'])
+ result = cli.run(project=project, args=["source", "track", "target.bst"])
result.assert_success()
# Assert that we are just fine without it, and emit a warning to the user.
@@ -391,68 +341,55 @@ def test_submodule_track_ignore_inconsistent(cli, tmpdir, datafiles):
@pytest.mark.skipif(HAVE_GIT is False, reason="git is not available")
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'template'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "template"))
def test_submodule_track_no_ref_or_track(cli, tmpdir, datafiles):
project = str(datafiles)
# Create the repo from 'repofiles' subdir
- repo = create_repo('git', str(tmpdir))
- repo.create(os.path.join(project, 'repofiles'))
+ repo = create_repo("git", str(tmpdir))
+ repo.create(os.path.join(project, "repofiles"))
# Write out our test target
gitsource = repo.source_config(ref=None)
- gitsource.pop('track')
- element = {
- 'kind': 'import',
- 'sources': [
- gitsource
- ]
- }
+ gitsource.pop("track")
+ element = {"kind": "import", "sources": [gitsource]}
- _yaml.roundtrip_dump(element, os.path.join(project, 'target.bst'))
+ _yaml.roundtrip_dump(element, os.path.join(project, "target.bst"))
# Track will encounter an inconsistent submodule without any ref
- result = cli.run(project=project, args=['show', 'target.bst'])
+ result = cli.run(project=project, args=["show", "target.bst"])
result.assert_main_error(ErrorDomain.SOURCE, "missing-track-and-ref")
result.assert_task_error(None, None)
@pytest.mark.skipif(HAVE_GIT is False, reason="git is not available")
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'template'))
-@pytest.mark.parametrize("fail", ['warn', 'error'])
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "template"))
+@pytest.mark.parametrize("fail", ["warn", "error"])
def test_ref_not_in_track(cli, tmpdir, datafiles, fail):
project = str(datafiles)
# Make the warning an error if we're testing errors
- if fail == 'error':
- project_template = {
- "name": "foo",
- "fatal-warnings": [CoreWarnings.REF_NOT_IN_TRACK]
- }
- _yaml.roundtrip_dump(project_template, os.path.join(project, 'project.conf'))
+ if fail == "error":
+ project_template = {"name": "foo", "fatal-warnings": [CoreWarnings.REF_NOT_IN_TRACK]}
+ _yaml.roundtrip_dump(project_template, os.path.join(project, "project.conf"))
# Create the repo from 'repofiles', create a branch without latest commit
- repo = create_repo('git', str(tmpdir))
- ref = repo.create(os.path.join(project, 'repofiles'))
+ repo = create_repo("git", str(tmpdir))
+ ref = repo.create(os.path.join(project, "repofiles"))
gitsource = repo.source_config(ref=ref)
# Overwrite the track value to the added branch
- gitsource['track'] = 'foo'
+ gitsource["track"] = "foo"
# Write out our test target
- element = {
- 'kind': 'import',
- 'sources': [
- gitsource
- ]
- }
- _yaml.roundtrip_dump(element, os.path.join(project, 'target.bst'))
+ element = {"kind": "import", "sources": [gitsource]}
+ _yaml.roundtrip_dump(element, os.path.join(project, "target.bst"))
- result = cli.run(project=project, args=['build', 'target.bst'])
+ result = cli.run(project=project, args=["build", "target.bst"])
# Assert a warning or an error depending on what we're checking
- if fail == 'error':
+ if fail == "error":
result.assert_main_error(ErrorDomain.STREAM, None)
result.assert_task_error(ErrorDomain.PLUGIN, CoreWarnings.REF_NOT_IN_TRACK)
else:
@@ -461,29 +398,26 @@ def test_ref_not_in_track(cli, tmpdir, datafiles, fail):
@pytest.mark.skipif(HAVE_GIT is False, reason="git is not available")
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'template'))
-@pytest.mark.parametrize("fail", ['warn', 'error'])
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "template"))
+@pytest.mark.parametrize("fail", ["warn", "error"])
def test_unlisted_submodule(cli, tmpdir, datafiles, fail):
project = str(datafiles)
# Make the warning an error if we're testing errors
- if fail == 'error':
- project_template = {
- "name": "foo",
- "fatal-warnings": ['git:unlisted-submodule']
- }
- _yaml.roundtrip_dump(project_template, os.path.join(project, 'project.conf'))
+ if fail == "error":
+ project_template = {"name": "foo", "fatal-warnings": ["git:unlisted-submodule"]}
+ _yaml.roundtrip_dump(project_template, os.path.join(project, "project.conf"))
# Create the submodule first from the 'subrepofiles' subdir
- subrepo = create_repo('git', str(tmpdir), 'subrepo')
- subrepo.create(os.path.join(project, 'subrepofiles'))
+ subrepo = create_repo("git", str(tmpdir), "subrepo")
+ subrepo.create(os.path.join(project, "subrepofiles"))
# Create the repo from 'repofiles' subdir
- repo = create_repo('git', str(tmpdir))
- repo.create(os.path.join(project, 'repofiles'))
+ repo = create_repo("git", str(tmpdir))
+ repo.create(os.path.join(project, "repofiles"))
# Add a submodule pointing to the one we created
- ref = repo.add_submodule('subdir', 'file://' + subrepo.repo)
+ ref = repo.add_submodule("subdir", "file://" + subrepo.repo)
# Create the source, and delete the explicit configuration
# of the submodules.
@@ -492,127 +426,111 @@ def test_unlisted_submodule(cli, tmpdir, datafiles, fail):
# after the source has been fetched.
#
gitsource = repo.source_config(ref=ref)
- del gitsource['submodules']
+ del gitsource["submodules"]
# Write out our test target
- element = {
- 'kind': 'import',
- 'sources': [
- gitsource
- ]
- }
- _yaml.roundtrip_dump(element, os.path.join(project, 'target.bst'))
+ element = {"kind": "import", "sources": [gitsource]}
+ _yaml.roundtrip_dump(element, os.path.join(project, "target.bst"))
# We will not see the warning or error before the first fetch, because
# we don't have the repository yet and so we have no knowledge of
# the unlisted submodule.
- result = cli.run(project=project, args=['show', 'target.bst'])
+ result = cli.run(project=project, args=["show", "target.bst"])
result.assert_success()
assert "git:unlisted-submodule" not in result.stderr
# We will notice this directly in fetch, as it will try to fetch
# the submodules it discovers as a result of fetching the primary repo.
- result = cli.run(project=project, args=['source', 'fetch', 'target.bst'])
+ result = cli.run(project=project, args=["source", "fetch", "target.bst"])
# Assert a warning or an error depending on what we're checking
- if fail == 'error':
+ if fail == "error":
result.assert_main_error(ErrorDomain.STREAM, None)
- result.assert_task_error(ErrorDomain.PLUGIN, 'git:unlisted-submodule')
+ result.assert_task_error(ErrorDomain.PLUGIN, "git:unlisted-submodule")
else:
result.assert_success()
assert "git:unlisted-submodule" in result.stderr
# Now that we've fetched it, `bst show` will discover the unlisted submodule too
- result = cli.run(project=project, args=['show', 'target.bst'])
+ result = cli.run(project=project, args=["show", "target.bst"])
# Assert a warning or an error depending on what we're checking
- if fail == 'error':
- result.assert_main_error(ErrorDomain.PLUGIN, 'git:unlisted-submodule')
+ if fail == "error":
+ result.assert_main_error(ErrorDomain.PLUGIN, "git:unlisted-submodule")
else:
result.assert_success()
assert "git:unlisted-submodule" in result.stderr
@pytest.mark.skipif(HAVE_GIT is False, reason="git is not available")
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'template'))
-@pytest.mark.parametrize("fail", ['warn', 'error'])
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "template"))
+@pytest.mark.parametrize("fail", ["warn", "error"])
def test_track_unlisted_submodule(cli, tmpdir, datafiles, fail):
project = str(datafiles)
# Make the warning an error if we're testing errors
- if fail == 'error':
- project_template = {
- "name": "foo",
- "fatal-warnings": ['git:unlisted-submodule']
- }
- _yaml.roundtrip_dump(project_template, os.path.join(project, 'project.conf'))
+ if fail == "error":
+ project_template = {"name": "foo", "fatal-warnings": ["git:unlisted-submodule"]}
+ _yaml.roundtrip_dump(project_template, os.path.join(project, "project.conf"))
# Create the submodule first from the 'subrepofiles' subdir
- subrepo = create_repo('git', str(tmpdir), 'subrepo')
- subrepo.create(os.path.join(project, 'subrepofiles'))
+ subrepo = create_repo("git", str(tmpdir), "subrepo")
+ subrepo.create(os.path.join(project, "subrepofiles"))
# Create the repo from 'repofiles' subdir
- repo = create_repo('git', str(tmpdir))
- ref = repo.create(os.path.join(project, 'repofiles'))
+ repo = create_repo("git", str(tmpdir))
+ ref = repo.create(os.path.join(project, "repofiles"))
# Add a submodule pointing to the one we created, but use
# the original ref, let the submodules appear after tracking
- repo.add_submodule('subdir', 'file://' + subrepo.repo)
+ repo.add_submodule("subdir", "file://" + subrepo.repo)
# Create the source, and delete the explicit configuration
# of the submodules.
gitsource = repo.source_config(ref=ref)
- del gitsource['submodules']
+ del gitsource["submodules"]
# Write out our test target
- element = {
- 'kind': 'import',
- 'sources': [
- gitsource
- ]
- }
- _yaml.roundtrip_dump(element, os.path.join(project, 'target.bst'))
+ element = {"kind": "import", "sources": [gitsource]}
+ _yaml.roundtrip_dump(element, os.path.join(project, "target.bst"))
# Fetch the repo, we will not see the warning because we
# are still pointing to a ref which predates the submodules
- result = cli.run(project=project, args=['source', 'fetch', 'target.bst'])
+ result = cli.run(project=project, args=["source", "fetch", "target.bst"])
result.assert_success()
assert "git:unlisted-submodule" not in result.stderr
# We won't get a warning/error when tracking either, the source
# has not become Consistency.CACHED so the opportunity to check
# for the warning has not yet arisen.
- result = cli.run(project=project, args=['source', 'track', 'target.bst'])
+ result = cli.run(project=project, args=["source", "track", "target.bst"])
result.assert_success()
assert "git:unlisted-submodule" not in result.stderr
# Fetching the repo at the new ref will finally reveal the warning
- result = cli.run(project=project, args=['source', 'fetch', 'target.bst'])
- if fail == 'error':
+ result = cli.run(project=project, args=["source", "fetch", "target.bst"])
+ if fail == "error":
result.assert_main_error(ErrorDomain.STREAM, None)
- result.assert_task_error(ErrorDomain.PLUGIN, 'git:unlisted-submodule')
+ result.assert_task_error(ErrorDomain.PLUGIN, "git:unlisted-submodule")
else:
result.assert_success()
assert "git:unlisted-submodule" in result.stderr
@pytest.mark.skipif(HAVE_GIT is False, reason="git is not available")
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'template'))
-@pytest.mark.parametrize("fail", ['warn', 'error'])
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "template"))
+@pytest.mark.parametrize("fail", ["warn", "error"])
def test_invalid_submodule(cli, tmpdir, datafiles, fail):
project = str(datafiles)
# Make the warning an error if we're testing errors
- if fail == 'error':
- project_template = {
- "name": "foo",
- "fatal-warnings": ['git:invalid-submodule']
- }
- _yaml.roundtrip_dump(project_template, os.path.join(project, 'project.conf'))
+ if fail == "error":
+ project_template = {"name": "foo", "fatal-warnings": ["git:invalid-submodule"]}
+ _yaml.roundtrip_dump(project_template, os.path.join(project, "project.conf"))
# Create the repo from 'repofiles' subdir
- repo = create_repo('git', str(tmpdir))
- ref = repo.create(os.path.join(project, 'repofiles'))
+ repo = create_repo("git", str(tmpdir))
+ ref = repo.create(os.path.join(project, "repofiles"))
# Create the source without any submodules, and add
# an invalid submodule configuration to it.
@@ -622,46 +540,37 @@ def test_invalid_submodule(cli, tmpdir, datafiles, fail):
# the real submodules actually are.
#
gitsource = repo.source_config(ref=ref)
- gitsource['submodules'] = {
- 'subdir': {
- 'url': 'https://pony.org/repo.git'
- }
- }
+ gitsource["submodules"] = {"subdir": {"url": "https://pony.org/repo.git"}}
# Write out our test target
- element = {
- 'kind': 'import',
- 'sources': [
- gitsource
- ]
- }
- _yaml.roundtrip_dump(element, os.path.join(project, 'target.bst'))
+ element = {"kind": "import", "sources": [gitsource]}
+ _yaml.roundtrip_dump(element, os.path.join(project, "target.bst"))
# We will not see the warning or error before the first fetch, because
# we don't have the repository yet and so we have no knowledge of
# the unlisted submodule.
- result = cli.run(project=project, args=['show', 'target.bst'])
+ result = cli.run(project=project, args=["show", "target.bst"])
result.assert_success()
assert "git:invalid-submodule" not in result.stderr
# We will notice this directly in fetch, as it will try to fetch
# the submodules it discovers as a result of fetching the primary repo.
- result = cli.run(project=project, args=['source', 'fetch', 'target.bst'])
+ result = cli.run(project=project, args=["source", "fetch", "target.bst"])
# Assert a warning or an error depending on what we're checking
- if fail == 'error':
+ if fail == "error":
result.assert_main_error(ErrorDomain.STREAM, None)
- result.assert_task_error(ErrorDomain.PLUGIN, 'git:invalid-submodule')
+ result.assert_task_error(ErrorDomain.PLUGIN, "git:invalid-submodule")
else:
result.assert_success()
assert "git:invalid-submodule" in result.stderr
# Now that we've fetched it, `bst show` will discover the unlisted submodule too
- result = cli.run(project=project, args=['show', 'target.bst'])
+ result = cli.run(project=project, args=["show", "target.bst"])
# Assert a warning or an error depending on what we're checking
- if fail == 'error':
- result.assert_main_error(ErrorDomain.PLUGIN, 'git:invalid-submodule')
+ if fail == "error":
+ result.assert_main_error(ErrorDomain.PLUGIN, "git:invalid-submodule")
else:
result.assert_success()
assert "git:invalid-submodule" in result.stderr
@@ -669,49 +578,41 @@ def test_invalid_submodule(cli, tmpdir, datafiles, fail):
@pytest.mark.skipif(HAVE_GIT is False, reason="git is not available")
@pytest.mark.skipif(HAVE_OLD_GIT, reason="old git rm does not update .gitmodules")
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'template'))
-@pytest.mark.parametrize("fail", ['warn', 'error'])
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "template"))
+@pytest.mark.parametrize("fail", ["warn", "error"])
def test_track_invalid_submodule(cli, tmpdir, datafiles, fail):
project = str(datafiles)
# Make the warning an error if we're testing errors
- if fail == 'error':
- project_template = {
- "name": "foo",
- "fatal-warnings": ['git:invalid-submodule']
- }
- _yaml.roundtrip_dump(project_template, os.path.join(project, 'project.conf'))
+ if fail == "error":
+ project_template = {"name": "foo", "fatal-warnings": ["git:invalid-submodule"]}
+ _yaml.roundtrip_dump(project_template, os.path.join(project, "project.conf"))
# Create the submodule first from the 'subrepofiles' subdir
- subrepo = create_repo('git', str(tmpdir), 'subrepo')
- subrepo.create(os.path.join(project, 'subrepofiles'))
+ subrepo = create_repo("git", str(tmpdir), "subrepo")
+ subrepo.create(os.path.join(project, "subrepofiles"))
# Create the repo from 'repofiles' subdir
- repo = create_repo('git', str(tmpdir))
- repo.create(os.path.join(project, 'repofiles'))
+ repo = create_repo("git", str(tmpdir))
+ repo.create(os.path.join(project, "repofiles"))
# Add a submodule pointing to the one we created
- ref = repo.add_submodule('subdir', 'file://' + subrepo.repo)
+ ref = repo.add_submodule("subdir", "file://" + subrepo.repo)
# Add a commit beyond the ref which *removes* the submodule we've added
- repo.remove_path('subdir')
+ repo.remove_path("subdir")
# Create the source, this will keep the submodules so initially
# the configuration is valid for the ref we're using
gitsource = repo.source_config(ref=ref)
# Write out our test target
- element = {
- 'kind': 'import',
- 'sources': [
- gitsource
- ]
- }
- _yaml.roundtrip_dump(element, os.path.join(project, 'target.bst'))
+ element = {"kind": "import", "sources": [gitsource]}
+ _yaml.roundtrip_dump(element, os.path.join(project, "target.bst"))
# Fetch the repo, we will not see the warning because we
# are still pointing to a ref which predates the submodules
- result = cli.run(project=project, args=['source', 'fetch', 'target.bst'])
+ result = cli.run(project=project, args=["source", "fetch", "target.bst"])
result.assert_success()
assert "git:invalid-submodule" not in result.stderr
@@ -720,417 +621,392 @@ def test_track_invalid_submodule(cli, tmpdir, datafiles, fail):
# not locally cached, the Source will be CACHED directly after
# tracking and the validations will occur as a result.
#
- result = cli.run(project=project, args=['source', 'track', 'target.bst'])
- if fail == 'error':
+ result = cli.run(project=project, args=["source", "track", "target.bst"])
+ if fail == "error":
result.assert_main_error(ErrorDomain.STREAM, None)
- result.assert_task_error(ErrorDomain.PLUGIN, 'git:invalid-submodule')
+ result.assert_task_error(ErrorDomain.PLUGIN, "git:invalid-submodule")
else:
result.assert_success()
assert "git:invalid-submodule" in result.stderr
@pytest.mark.skipif(HAVE_GIT is False, reason="git is not available")
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'template'))
-@pytest.mark.parametrize("ref_format", ['sha1', 'git-describe'])
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "template"))
+@pytest.mark.parametrize("ref_format", ["sha1", "git-describe"])
@pytest.mark.parametrize("tag,extra_commit", [(False, False), (True, False), (True, True)])
def test_track_fetch(cli, tmpdir, datafiles, ref_format, tag, extra_commit):
project = str(datafiles)
# Create the repo from 'repofiles' subdir
- repo = create_repo('git', str(tmpdir))
- repo.create(os.path.join(project, 'repofiles'))
+ repo = create_repo("git", str(tmpdir))
+ repo.create(os.path.join(project, "repofiles"))
if tag:
- repo.add_tag('tag')
+ repo.add_tag("tag")
if extra_commit:
repo.add_commit()
# Write out our test target
- element = {
- 'kind': 'import',
- 'sources': [
- repo.source_config()
- ]
- }
- element['sources'][0]['ref-format'] = ref_format
- element_path = os.path.join(project, 'target.bst')
+ element = {"kind": "import", "sources": [repo.source_config()]}
+ element["sources"][0]["ref-format"] = ref_format
+ element_path = os.path.join(project, "target.bst")
_yaml.roundtrip_dump(element, element_path)
# Track it
- result = cli.run(project=project, args=['source', 'track', 'target.bst'])
+ result = cli.run(project=project, args=["source", "track", "target.bst"])
result.assert_success()
element = _yaml.load(element_path)
- new_ref = element.get_sequence('sources').mapping_at(0).get_str('ref')
+ new_ref = element.get_sequence("sources").mapping_at(0).get_str("ref")
- if ref_format == 'git-describe' and tag:
+ if ref_format == "git-describe" and tag:
# Check and strip prefix
- prefix = 'tag-{}-g'.format(0 if not extra_commit else 1)
+ prefix = "tag-{}-g".format(0 if not extra_commit else 1)
assert new_ref.startswith(prefix)
- new_ref = new_ref[len(prefix):]
+ new_ref = new_ref[len(prefix) :]
# 40 chars for SHA-1
assert len(new_ref) == 40
# Fetch it
- result = cli.run(project=project, args=['source', 'fetch', 'target.bst'])
+ result = cli.run(project=project, args=["source", "fetch", "target.bst"])
result.assert_success()
@pytest.mark.skipif(HAVE_GIT is False, reason="git is not available")
@pytest.mark.skipif(HAVE_OLD_GIT, reason="old git describe lacks --first-parent")
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'template'))
-@pytest.mark.parametrize("ref_storage", [('inline'), ('project.refs')])
-@pytest.mark.parametrize("tag_type", [('annotated'), ('lightweight')])
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "template"))
+@pytest.mark.parametrize("ref_storage", [("inline"), ("project.refs")])
+@pytest.mark.parametrize("tag_type", [("annotated"), ("lightweight")])
def test_git_describe(cli, tmpdir, datafiles, ref_storage, tag_type):
project = str(datafiles)
- project_config = _yaml.load(os.path.join(project, 'project.conf'))
- project_config['ref-storage'] = ref_storage
- _yaml.roundtrip_dump(project_config, os.path.join(project, 'project.conf'))
+ project_config = _yaml.load(os.path.join(project, "project.conf"))
+ project_config["ref-storage"] = ref_storage
+ _yaml.roundtrip_dump(project_config, os.path.join(project, "project.conf"))
- repofiles = os.path.join(str(tmpdir), 'repofiles')
+ repofiles = os.path.join(str(tmpdir), "repofiles")
os.makedirs(repofiles, exist_ok=True)
- file0 = os.path.join(repofiles, 'file0')
- with open(file0, 'w') as f:
- f.write('test\n')
+ file0 = os.path.join(repofiles, "file0")
+ with open(file0, "w") as f:
+ f.write("test\n")
- repo = create_repo('git', str(tmpdir))
+ repo = create_repo("git", str(tmpdir))
def tag(name):
- if tag_type == 'annotated':
+ if tag_type == "annotated":
repo.add_annotated_tag(name, name)
else:
repo.add_tag(name)
repo.create(repofiles)
- tag('uselesstag')
+ tag("uselesstag")
- file1 = os.path.join(str(tmpdir), 'file1')
- with open(file1, 'w') as f:
- f.write('test\n')
+ file1 = os.path.join(str(tmpdir), "file1")
+ with open(file1, "w") as f:
+ f.write("test\n")
repo.add_file(file1)
- tag('tag1')
+ tag("tag1")
- file2 = os.path.join(str(tmpdir), 'file2')
- with open(file2, 'w') as f:
- f.write('test\n')
- repo.branch('branch2')
+ file2 = os.path.join(str(tmpdir), "file2")
+ with open(file2, "w") as f:
+ f.write("test\n")
+ repo.branch("branch2")
repo.add_file(file2)
- tag('tag2')
+ tag("tag2")
- repo.checkout('master')
- file3 = os.path.join(str(tmpdir), 'file3')
- with open(file3, 'w') as f:
- f.write('test\n')
+ repo.checkout("master")
+ file3 = os.path.join(str(tmpdir), "file3")
+ with open(file3, "w") as f:
+ f.write("test\n")
repo.add_file(file3)
- repo.merge('branch2')
+ repo.merge("branch2")
config = repo.source_config()
- config['track'] = repo.latest_commit()
- config['track-tags'] = True
+ config["track"] = repo.latest_commit()
+ config["track-tags"] = True
# Write out our test target
element = {
- 'kind': 'import',
- 'sources': [
- config
- ],
+ "kind": "import",
+ "sources": [config],
}
- element_path = os.path.join(project, 'target.bst')
+ element_path = os.path.join(project, "target.bst")
_yaml.roundtrip_dump(element, element_path)
- if ref_storage == 'inline':
- result = cli.run(project=project, args=['source', 'track', 'target.bst'])
+ if ref_storage == "inline":
+ result = cli.run(project=project, args=["source", "track", "target.bst"])
result.assert_success()
else:
- result = cli.run(project=project, args=['source', 'track', 'target.bst', '--deps', 'all'])
+ result = cli.run(project=project, args=["source", "track", "target.bst", "--deps", "all"])
result.assert_success()
- if ref_storage == 'inline':
+ if ref_storage == "inline":
element = _yaml.load(element_path)
- tags = element.get_sequence('sources').mapping_at(0).get_sequence('tags')
+ tags = element.get_sequence("sources").mapping_at(0).get_sequence("tags")
assert len(tags) == 2
for tag in tags:
- assert 'tag' in tag
- assert 'commit' in tag
- assert 'annotated' in tag
- assert tag.get_bool('annotated') == (tag_type == 'annotated')
-
- assert {(tag.get_str('tag'),
- tag.get_str('commit'))
- for tag in tags} == {('tag1', repo.rev_parse('tag1^{commit}')),
- ('tag2', repo.rev_parse('tag2^{commit}'))}
+ assert "tag" in tag
+ assert "commit" in tag
+ assert "annotated" in tag
+ assert tag.get_bool("annotated") == (tag_type == "annotated")
+
+ assert {(tag.get_str("tag"), tag.get_str("commit")) for tag in tags} == {
+ ("tag1", repo.rev_parse("tag1^{commit}")),
+ ("tag2", repo.rev_parse("tag2^{commit}")),
+ }
- checkout = os.path.join(str(tmpdir), 'checkout')
+ checkout = os.path.join(str(tmpdir), "checkout")
- result = cli.run(project=project, args=['build', 'target.bst'])
+ result = cli.run(project=project, args=["build", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['artifact', 'checkout', 'target.bst', '--directory', checkout])
+ result = cli.run(project=project, args=["artifact", "checkout", "target.bst", "--directory", checkout])
result.assert_success()
- if tag_type == 'annotated':
+ if tag_type == "annotated":
options = []
else:
- options = ['--tags']
- describe = subprocess.check_output(['git', 'describe', *options],
- cwd=checkout, universal_newlines=True)
- assert describe.startswith('tag2-2-')
+ options = ["--tags"]
+ describe = subprocess.check_output(["git", "describe", *options], cwd=checkout, universal_newlines=True)
+ assert describe.startswith("tag2-2-")
- describe_fp = subprocess.check_output(['git', 'describe', '--first-parent', *options],
- cwd=checkout, universal_newlines=True)
- assert describe_fp.startswith('tag1-2-')
+ describe_fp = subprocess.check_output(
+ ["git", "describe", "--first-parent", *options], cwd=checkout, universal_newlines=True
+ )
+ assert describe_fp.startswith("tag1-2-")
- tags = subprocess.check_output(['git', 'tag'],
- cwd=checkout, universal_newlines=True)
+ tags = subprocess.check_output(["git", "tag"], cwd=checkout, universal_newlines=True)
tags = set(tags.splitlines())
- assert tags == set(['tag1', 'tag2'])
+ assert tags == set(["tag1", "tag2"])
- p = subprocess.run(['git', 'log', repo.rev_parse('uselesstag')],
- cwd=checkout)
+ p = subprocess.run(["git", "log", repo.rev_parse("uselesstag")], cwd=checkout)
assert p.returncode != 0
@pytest.mark.skipif(HAVE_GIT is False, reason="git is not available")
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'template'))
-@pytest.mark.parametrize("ref_storage", [('inline'), ('project.refs')])
-@pytest.mark.parametrize("tag_type", [('annotated'), ('lightweight')])
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "template"))
+@pytest.mark.parametrize("ref_storage", [("inline"), ("project.refs")])
+@pytest.mark.parametrize("tag_type", [("annotated"), ("lightweight")])
def test_git_describe_head_is_tagged(cli, tmpdir, datafiles, ref_storage, tag_type):
project = str(datafiles)
- project_config = _yaml.load(os.path.join(project, 'project.conf'))
- project_config['ref-storage'] = ref_storage
- _yaml.roundtrip_dump(project_config, os.path.join(project, 'project.conf'))
+ project_config = _yaml.load(os.path.join(project, "project.conf"))
+ project_config["ref-storage"] = ref_storage
+ _yaml.roundtrip_dump(project_config, os.path.join(project, "project.conf"))
- repofiles = os.path.join(str(tmpdir), 'repofiles')
+ repofiles = os.path.join(str(tmpdir), "repofiles")
os.makedirs(repofiles, exist_ok=True)
- file0 = os.path.join(repofiles, 'file0')
- with open(file0, 'w') as f:
- f.write('test\n')
+ file0 = os.path.join(repofiles, "file0")
+ with open(file0, "w") as f:
+ f.write("test\n")
- repo = create_repo('git', str(tmpdir))
+ repo = create_repo("git", str(tmpdir))
def tag(name):
- if tag_type == 'annotated':
+ if tag_type == "annotated":
repo.add_annotated_tag(name, name)
else:
repo.add_tag(name)
repo.create(repofiles)
- tag('uselesstag')
+ tag("uselesstag")
- file1 = os.path.join(str(tmpdir), 'file1')
- with open(file1, 'w') as f:
- f.write('test\n')
+ file1 = os.path.join(str(tmpdir), "file1")
+ with open(file1, "w") as f:
+ f.write("test\n")
repo.add_file(file1)
- file2 = os.path.join(str(tmpdir), 'file2')
- with open(file2, 'w') as f:
- f.write('test\n')
- repo.branch('branch2')
+ file2 = os.path.join(str(tmpdir), "file2")
+ with open(file2, "w") as f:
+ f.write("test\n")
+ repo.branch("branch2")
repo.add_file(file2)
- repo.checkout('master')
- file3 = os.path.join(str(tmpdir), 'file3')
- with open(file3, 'w') as f:
- f.write('test\n')
+ repo.checkout("master")
+ file3 = os.path.join(str(tmpdir), "file3")
+ with open(file3, "w") as f:
+ f.write("test\n")
repo.add_file(file3)
- tagged_ref = repo.merge('branch2')
- tag('tag')
+ tagged_ref = repo.merge("branch2")
+ tag("tag")
config = repo.source_config()
- config['track'] = repo.latest_commit()
- config['track-tags'] = True
+ config["track"] = repo.latest_commit()
+ config["track-tags"] = True
# Write out our test target
element = {
- 'kind': 'import',
- 'sources': [
- config
- ],
+ "kind": "import",
+ "sources": [config],
}
- element_path = os.path.join(project, 'target.bst')
+ element_path = os.path.join(project, "target.bst")
_yaml.roundtrip_dump(element, element_path)
- if ref_storage == 'inline':
- result = cli.run(project=project, args=['source', 'track', 'target.bst'])
+ if ref_storage == "inline":
+ result = cli.run(project=project, args=["source", "track", "target.bst"])
result.assert_success()
else:
- result = cli.run(project=project, args=['source', 'track', 'target.bst', '--deps', 'all'])
+ result = cli.run(project=project, args=["source", "track", "target.bst", "--deps", "all"])
result.assert_success()
- if ref_storage == 'inline':
+ if ref_storage == "inline":
element = _yaml.load(element_path)
- source = element.get_sequence('sources').mapping_at(0)
- tags = source.get_sequence('tags')
+ source = element.get_sequence("sources").mapping_at(0)
+ tags = source.get_sequence("tags")
assert len(tags) == 1
- tag = source.get_sequence('tags').mapping_at(0)
- assert 'tag' in tag
- assert 'commit' in tag
- assert 'annotated' in tag
- assert tag.get_bool('annotated') == (tag_type == 'annotated')
+ tag = source.get_sequence("tags").mapping_at(0)
+ assert "tag" in tag
+ assert "commit" in tag
+ assert "annotated" in tag
+ assert tag.get_bool("annotated") == (tag_type == "annotated")
- tag_name = tag.get_str('tag')
- commit = tag.get_str('commit')
- assert (tag_name, commit) == ('tag', repo.rev_parse('tag^{commit}'))
+ tag_name = tag.get_str("tag")
+ commit = tag.get_str("commit")
+ assert (tag_name, commit) == ("tag", repo.rev_parse("tag^{commit}"))
- checkout = os.path.join(str(tmpdir), 'checkout')
+ checkout = os.path.join(str(tmpdir), "checkout")
- result = cli.run(project=project, args=['build', 'target.bst'])
+ result = cli.run(project=project, args=["build", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['artifact', 'checkout', 'target.bst', '--directory', checkout])
+ result = cli.run(project=project, args=["artifact", "checkout", "target.bst", "--directory", checkout])
result.assert_success()
- if tag_type == 'annotated':
+ if tag_type == "annotated":
options = []
else:
- options = ['--tags']
- describe = subprocess.check_output(['git', 'describe', *options],
- cwd=checkout, universal_newlines=True)
- assert describe.startswith('tag')
-
- tags = subprocess.check_output(['git', 'tag'],
- cwd=checkout,
- universal_newlines=True)
+ options = ["--tags"]
+ describe = subprocess.check_output(["git", "describe", *options], cwd=checkout, universal_newlines=True)
+ assert describe.startswith("tag")
+
+ tags = subprocess.check_output(["git", "tag"], cwd=checkout, universal_newlines=True)
tags = set(tags.splitlines())
- assert tags == set(['tag'])
+ assert tags == set(["tag"])
- rev_list = subprocess.check_output(['git', 'rev-list', '--all'],
- cwd=checkout,
- universal_newlines=True)
+ rev_list = subprocess.check_output(["git", "rev-list", "--all"], cwd=checkout, universal_newlines=True)
assert set(rev_list.splitlines()) == set([tagged_ref])
- p = subprocess.run(['git', 'log', repo.rev_parse('uselesstag')],
- cwd=checkout)
+ p = subprocess.run(["git", "log", repo.rev_parse("uselesstag")], cwd=checkout)
assert p.returncode != 0
@pytest.mark.skipif(HAVE_GIT is False, reason="git is not available")
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'template'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "template"))
def test_git_describe_relevant_history(cli, tmpdir, datafiles):
project = str(datafiles)
- project_config = _yaml.load(os.path.join(project, 'project.conf'))
- project_config['ref-storage'] = 'project.refs'
- _yaml.roundtrip_dump(project_config, os.path.join(project, 'project.conf'))
+ project_config = _yaml.load(os.path.join(project, "project.conf"))
+ project_config["ref-storage"] = "project.refs"
+ _yaml.roundtrip_dump(project_config, os.path.join(project, "project.conf"))
- repofiles = os.path.join(str(tmpdir), 'repofiles')
+ repofiles = os.path.join(str(tmpdir), "repofiles")
os.makedirs(repofiles, exist_ok=True)
- file0 = os.path.join(repofiles, 'file0')
- with open(file0, 'w') as f:
- f.write('test\n')
+ file0 = os.path.join(repofiles, "file0")
+ with open(file0, "w") as f:
+ f.write("test\n")
- repo = create_repo('git', str(tmpdir))
+ repo = create_repo("git", str(tmpdir))
repo.create(repofiles)
- file1 = os.path.join(str(tmpdir), 'file1')
- with open(file1, 'w') as f:
- f.write('test\n')
+ file1 = os.path.join(str(tmpdir), "file1")
+ with open(file1, "w") as f:
+ f.write("test\n")
repo.add_file(file1)
- repo.branch('branch')
- repo.checkout('master')
+ repo.branch("branch")
+ repo.checkout("master")
- file2 = os.path.join(str(tmpdir), 'file2')
- with open(file2, 'w') as f:
- f.write('test\n')
+ file2 = os.path.join(str(tmpdir), "file2")
+ with open(file2, "w") as f:
+ f.write("test\n")
repo.add_file(file2)
- file3 = os.path.join(str(tmpdir), 'file3')
- with open(file3, 'w') as f:
- f.write('test\n')
+ file3 = os.path.join(str(tmpdir), "file3")
+ with open(file3, "w") as f:
+ f.write("test\n")
branch_boundary = repo.add_file(file3)
- repo.checkout('branch')
- file4 = os.path.join(str(tmpdir), 'file4')
- with open(file4, 'w') as f:
- f.write('test\n')
+ repo.checkout("branch")
+ file4 = os.path.join(str(tmpdir), "file4")
+ with open(file4, "w") as f:
+ f.write("test\n")
tagged_ref = repo.add_file(file4)
- repo.add_annotated_tag('tag1', 'tag1')
+ repo.add_annotated_tag("tag1", "tag1")
- head = repo.merge('master')
+ head = repo.merge("master")
config = repo.source_config()
- config['track'] = head
- config['track-tags'] = True
+ config["track"] = head
+ config["track-tags"] = True
# Write out our test target
element = {
- 'kind': 'import',
- 'sources': [
- config
- ],
+ "kind": "import",
+ "sources": [config],
}
- element_path = os.path.join(project, 'target.bst')
+ element_path = os.path.join(project, "target.bst")
_yaml.roundtrip_dump(element, element_path)
- result = cli.run(project=project, args=['source', 'track', 'target.bst', '--deps', 'all'])
+ result = cli.run(project=project, args=["source", "track", "target.bst", "--deps", "all"])
result.assert_success()
- checkout = os.path.join(str(tmpdir), 'checkout')
+ checkout = os.path.join(str(tmpdir), "checkout")
- result = cli.run(project=project, args=['build', 'target.bst'])
+ result = cli.run(project=project, args=["build", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['artifact', 'checkout', 'target.bst', '--directory', checkout])
+ result = cli.run(project=project, args=["artifact", "checkout", "target.bst", "--directory", checkout])
result.assert_success()
- describe = subprocess.check_output(['git', 'describe'],
- cwd=checkout,
- universal_newlines=True)
- assert describe.startswith('tag1-2-')
+ describe = subprocess.check_output(["git", "describe"], cwd=checkout, universal_newlines=True)
+ assert describe.startswith("tag1-2-")
- rev_list = subprocess.check_output(['git', 'rev-list', '--all'],
- cwd=checkout,
- universal_newlines=True)
+ rev_list = subprocess.check_output(["git", "rev-list", "--all"], cwd=checkout, universal_newlines=True)
assert set(rev_list.splitlines()) == set([head, tagged_ref, branch_boundary])
@pytest.mark.skipif(HAVE_GIT is False, reason="git is not available")
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'template'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "template"))
def test_default_do_not_track_tags(cli, tmpdir, datafiles):
project = str(datafiles)
- project_config = _yaml.load(os.path.join(project, 'project.conf'))
- project_config['ref-storage'] = 'inline'
- _yaml.roundtrip_dump(project_config, os.path.join(project, 'project.conf'))
+ project_config = _yaml.load(os.path.join(project, "project.conf"))
+ project_config["ref-storage"] = "inline"
+ _yaml.roundtrip_dump(project_config, os.path.join(project, "project.conf"))
- repofiles = os.path.join(str(tmpdir), 'repofiles')
+ repofiles = os.path.join(str(tmpdir), "repofiles")
os.makedirs(repofiles, exist_ok=True)
- file0 = os.path.join(repofiles, 'file0')
- with open(file0, 'w') as f:
- f.write('test\n')
+ file0 = os.path.join(repofiles, "file0")
+ with open(file0, "w") as f:
+ f.write("test\n")
- repo = create_repo('git', str(tmpdir))
+ repo = create_repo("git", str(tmpdir))
repo.create(repofiles)
- repo.add_tag('tag')
+ repo.add_tag("tag")
config = repo.source_config()
- config['track'] = repo.latest_commit()
+ config["track"] = repo.latest_commit()
# Write out our test target
element = {
- 'kind': 'import',
- 'sources': [
- config
- ],
+ "kind": "import",
+ "sources": [config],
}
- element_path = os.path.join(project, 'target.bst')
+ element_path = os.path.join(project, "target.bst")
_yaml.roundtrip_dump(element, element_path)
- result = cli.run(project=project, args=['source', 'track', 'target.bst'])
+ result = cli.run(project=project, args=["source", "track", "target.bst"])
result.assert_success()
element = _yaml.load(element_path)
- source = element.get_sequence('sources').mapping_at(0)
- assert 'tags' not in source
+ source = element.get_sequence("sources").mapping_at(0)
+ assert "tags" not in source
@pytest.mark.skipif(HAVE_GIT is False, reason="git is not available")
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'template'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "template"))
def test_overwrite_rogue_tag_multiple_remotes(cli, tmpdir, datafiles):
"""When using multiple remotes in cache (i.e. when using aliases), we
need to make sure we override tags. This is not allowed to fetch
@@ -1139,88 +1015,74 @@ def test_overwrite_rogue_tag_multiple_remotes(cli, tmpdir, datafiles):
project = str(datafiles)
- repofiles = os.path.join(str(tmpdir), 'repofiles')
+ repofiles = os.path.join(str(tmpdir), "repofiles")
os.makedirs(repofiles, exist_ok=True)
- file0 = os.path.join(repofiles, 'file0')
- with open(file0, 'w') as f:
- f.write('test\n')
+ file0 = os.path.join(repofiles, "file0")
+ with open(file0, "w") as f:
+ f.write("test\n")
- repo = create_repo('git', str(tmpdir))
+ repo = create_repo("git", str(tmpdir))
top_commit = repo.create(repofiles)
repodir, reponame = os.path.split(repo.repo)
- project_config = _yaml.load(os.path.join(project, 'project.conf'))
- project_config['aliases'] = Node.from_dict({
- 'repo': 'http://example.com/'
- })
- project_config['mirrors'] = [
- {
- 'name': 'middle-earth',
- 'aliases': {
- 'repo': ['file://{}/'.format(repodir)]
- }
- }
- ]
- _yaml.roundtrip_dump(project_config, os.path.join(project, 'project.conf'))
+ project_config = _yaml.load(os.path.join(project, "project.conf"))
+ project_config["aliases"] = Node.from_dict({"repo": "http://example.com/"})
+ project_config["mirrors"] = [{"name": "middle-earth", "aliases": {"repo": ["file://{}/".format(repodir)]}}]
+ _yaml.roundtrip_dump(project_config, os.path.join(project, "project.conf"))
- repo.add_annotated_tag('tag', 'tag')
+ repo.add_annotated_tag("tag", "tag")
- file1 = os.path.join(repofiles, 'file1')
- with open(file1, 'w') as f:
- f.write('test\n')
+ file1 = os.path.join(repofiles, "file1")
+ with open(file1, "w") as f:
+ f.write("test\n")
ref = repo.add_file(file1)
config = repo.source_config(ref=ref)
- del config['track']
- config['url'] = 'repo:{}'.format(reponame)
+ del config["track"]
+ config["url"] = "repo:{}".format(reponame)
# Write out our test target
element = {
- 'kind': 'import',
- 'sources': [
- config
- ],
+ "kind": "import",
+ "sources": [config],
}
- element_path = os.path.join(project, 'target.bst')
+ element_path = os.path.join(project, "target.bst")
_yaml.roundtrip_dump(element, element_path)
- result = cli.run(project=project, args=['build', 'target.bst'])
+ result = cli.run(project=project, args=["build", "target.bst"])
result.assert_success()
repo.checkout(top_commit)
- file2 = os.path.join(repofiles, 'file2')
- with open(file2, 'w') as f:
- f.write('test\n')
+ file2 = os.path.join(repofiles, "file2")
+ with open(file2, "w") as f:
+ f.write("test\n")
new_ref = repo.add_file(file2)
- repo.delete_tag('tag')
- repo.add_annotated_tag('tag', 'tag')
- repo.checkout('master')
+ repo.delete_tag("tag")
+ repo.add_annotated_tag("tag", "tag")
+ repo.checkout("master")
- otherpath = os.path.join(str(tmpdir), 'other_path')
- shutil.copytree(repo.repo,
- os.path.join(otherpath, 'repo'))
- create_repo('git', otherpath)
+ otherpath = os.path.join(str(tmpdir), "other_path")
+ shutil.copytree(repo.repo, os.path.join(otherpath, "repo"))
+ create_repo("git", otherpath)
repodir, reponame = os.path.split(repo.repo)
- _yaml.roundtrip_dump(project_config, os.path.join(project, 'project.conf'))
+ _yaml.roundtrip_dump(project_config, os.path.join(project, "project.conf"))
config = repo.source_config(ref=new_ref)
- del config['track']
- config['url'] = 'repo:{}'.format(reponame)
+ del config["track"]
+ config["url"] = "repo:{}".format(reponame)
element = {
- 'kind': 'import',
- 'sources': [
- config
- ],
+ "kind": "import",
+ "sources": [config],
}
_yaml.roundtrip_dump(element, element_path)
- result = cli.run(project=project, args=['build', 'target.bst'])
+ result = cli.run(project=project, args=["build", "target.bst"])
result.assert_success()
diff --git a/tests/sources/keytest.py b/tests/sources/keytest.py
index d3eab8d6b..46d0d07fe 100644
--- a/tests/sources/keytest.py
+++ b/tests/sources/keytest.py
@@ -27,8 +27,7 @@ import pytest
from buildstream._exceptions import ErrorDomain
from buildstream.testing import cli # pylint: disable=unused-import
-DATA_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)),
- "project_key_test")
+DATA_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "project_key_test")
# using the key-test plugin to ensure get_unique_key is never called before
diff --git a/tests/sources/local.py b/tests/sources/local.py
index 4b72a4343..92313ca7f 100644
--- a/tests/sources/local.py
+++ b/tests/sources/local.py
@@ -10,103 +10,93 @@ from buildstream.testing import cli # pylint: disable=unused-import
from buildstream.testing._utils.site import HAVE_SANDBOX
from tests.testutils import filetypegenerator
-DATA_DIR = os.path.join(
- os.path.dirname(os.path.realpath(__file__)),
- 'local',
-)
+DATA_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "local",)
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'basic'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "basic"))
def test_missing_path(cli, datafiles):
project = str(datafiles)
# Removing the local file causes preflight to fail
- localfile = os.path.join(project, 'file.txt')
+ localfile = os.path.join(project, "file.txt")
os.remove(localfile)
- result = cli.run(project=project, args=[
- 'show', 'target.bst'
- ])
+ result = cli.run(project=project, args=["show", "target.bst"])
result.assert_main_error(ErrorDomain.LOAD, LoadErrorReason.MISSING_FILE)
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'basic'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "basic"))
def test_non_regular_file_or_directory(cli, datafiles):
project = str(datafiles)
- localfile = os.path.join(project, 'file.txt')
+ localfile = os.path.join(project, "file.txt")
for _file_type in filetypegenerator.generate_file_types(localfile):
- result = cli.run(project=project, args=[
- 'show', 'target.bst'
- ])
+ result = cli.run(project=project, args=["show", "target.bst"])
if os.path.isdir(localfile) and not os.path.islink(localfile):
result.assert_success()
elif os.path.isfile(localfile) and not os.path.islink(localfile):
result.assert_success()
else:
- result.assert_main_error(ErrorDomain.LOAD,
- LoadErrorReason.PROJ_PATH_INVALID_KIND)
+ result.assert_main_error(ErrorDomain.LOAD, LoadErrorReason.PROJ_PATH_INVALID_KIND)
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'basic'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "basic"))
def test_invalid_absolute_path(cli, datafiles):
project = str(datafiles)
- with open(os.path.join(project, "target.bst"), 'r') as f:
+ with open(os.path.join(project, "target.bst"), "r") as f:
old_yaml = f.read()
new_yaml = old_yaml.replace("file.txt", os.path.join(project, "file.txt"))
assert old_yaml != new_yaml
- with open(os.path.join(project, "target.bst"), 'w') as f:
+ with open(os.path.join(project, "target.bst"), "w") as f:
f.write(new_yaml)
- result = cli.run(project=project, args=['show', 'target.bst'])
- result.assert_main_error(ErrorDomain.LOAD,
- LoadErrorReason.PROJ_PATH_INVALID)
+ result = cli.run(project=project, args=["show", "target.bst"])
+ result.assert_main_error(ErrorDomain.LOAD, LoadErrorReason.PROJ_PATH_INVALID)
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'invalid-relative-path'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "invalid-relative-path"))
def test_invalid_relative_path(cli, datafiles):
project = str(datafiles)
- result = cli.run(project=project, args=['show', 'target.bst'])
- result.assert_main_error(ErrorDomain.LOAD,
- LoadErrorReason.PROJ_PATH_INVALID)
+ result = cli.run(project=project, args=["show", "target.bst"])
+ result.assert_main_error(ErrorDomain.LOAD, LoadErrorReason.PROJ_PATH_INVALID)
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'basic'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "basic"))
def test_stage_file(cli, tmpdir, datafiles):
project = str(datafiles)
checkoutdir = os.path.join(str(tmpdir), "checkout")
# Build, checkout
- result = cli.run(project=project, args=['build', 'target.bst'])
+ result = cli.run(project=project, args=["build", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['artifact', 'checkout', 'target.bst', '--directory', checkoutdir])
+ result = cli.run(project=project, args=["artifact", "checkout", "target.bst", "--directory", checkoutdir])
result.assert_success()
# Check that the checkout contains the expected file
- assert os.path.exists(os.path.join(checkoutdir, 'file.txt'))
+ assert os.path.exists(os.path.join(checkoutdir, "file.txt"))
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'directory'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "directory"))
def test_stage_directory(cli, tmpdir, datafiles):
project = str(datafiles)
checkoutdir = os.path.join(str(tmpdir), "checkout")
# Build, checkout
- result = cli.run(project=project, args=['build', 'target.bst'])
+ result = cli.run(project=project, args=["build", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['artifact', 'checkout', 'target.bst', '--directory', checkoutdir])
+ result = cli.run(project=project, args=["artifact", "checkout", "target.bst", "--directory", checkoutdir])
result.assert_success()
# Check that the checkout contains the expected file and directory and other file
- assert os.path.exists(os.path.join(checkoutdir, 'file.txt'))
- assert os.path.exists(os.path.join(checkoutdir, 'subdir', 'anotherfile.txt'))
+ assert os.path.exists(os.path.join(checkoutdir, "file.txt"))
+ assert os.path.exists(os.path.join(checkoutdir, "subdir", "anotherfile.txt"))
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'symlink'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "symlink"))
def test_stage_symlink(cli, tmpdir, datafiles):
project = str(datafiles)
@@ -117,104 +107,91 @@ def test_stage_symlink(cli, tmpdir, datafiles):
# https://github.com/omarkohl/pytest-datafiles/issues/1
#
# Create the symlink by hand.
- symlink = os.path.join(project, 'files', 'symlink-to-file.txt')
- os.symlink('file.txt', symlink)
+ symlink = os.path.join(project, "files", "symlink-to-file.txt")
+ os.symlink("file.txt", symlink)
# Build, checkout
- result = cli.run(project=project, args=['build', 'target.bst'])
+ result = cli.run(project=project, args=["build", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['artifact', 'checkout', 'target.bst', '--directory', checkoutdir])
+ result = cli.run(project=project, args=["artifact", "checkout", "target.bst", "--directory", checkoutdir])
result.assert_success()
# Check that the checkout contains the expected file and directory and other file
- assert os.path.exists(os.path.join(checkoutdir, 'file.txt'))
- assert os.path.exists(os.path.join(checkoutdir, 'symlink-to-file.txt'))
- assert os.path.islink(os.path.join(checkoutdir, 'symlink-to-file.txt'))
+ assert os.path.exists(os.path.join(checkoutdir, "file.txt"))
+ assert os.path.exists(os.path.join(checkoutdir, "symlink-to-file.txt"))
+ assert os.path.islink(os.path.join(checkoutdir, "symlink-to-file.txt"))
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'file-exists'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "file-exists"))
def test_stage_file_exists(cli, datafiles):
project = str(datafiles)
# Build, checkout
- result = cli.run(project=project, args=['build', 'target.bst'])
+ result = cli.run(project=project, args=["build", "target.bst"])
result.assert_main_error(ErrorDomain.STREAM, None)
result.assert_task_error(ErrorDomain.ELEMENT, "import-source-files-fail")
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'directory'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "directory"))
def test_stage_directory_symlink(cli, tmpdir, datafiles):
project = str(datafiles)
checkoutdir = os.path.join(str(tmpdir), "checkout")
- symlink = os.path.join(project, 'files', 'symlink-to-subdir')
- os.symlink('subdir', symlink)
+ symlink = os.path.join(project, "files", "symlink-to-subdir")
+ os.symlink("subdir", symlink)
# Build, checkout
- result = cli.run(project=project, args=['build', 'target.bst'])
+ result = cli.run(project=project, args=["build", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['artifact', 'checkout', 'target.bst', '--directory', checkoutdir])
+ result = cli.run(project=project, args=["artifact", "checkout", "target.bst", "--directory", checkoutdir])
result.assert_success()
# Check that the checkout contains the expected directory and directory symlink
- assert os.path.exists(os.path.join(checkoutdir, 'subdir', 'anotherfile.txt'))
- assert os.path.exists(os.path.join(checkoutdir, 'symlink-to-subdir', 'anotherfile.txt'))
- assert os.path.islink(os.path.join(checkoutdir, 'symlink-to-subdir'))
+ assert os.path.exists(os.path.join(checkoutdir, "subdir", "anotherfile.txt"))
+ assert os.path.exists(os.path.join(checkoutdir, "symlink-to-subdir", "anotherfile.txt"))
+ assert os.path.islink(os.path.join(checkoutdir, "symlink-to-subdir"))
@pytest.mark.integration
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'deterministic-umask'))
-@pytest.mark.skipif(not HAVE_SANDBOX, reason='Only available with a functioning sandbox')
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "deterministic-umask"))
+@pytest.mark.skipif(not HAVE_SANDBOX, reason="Only available with a functioning sandbox")
def test_deterministic_source_umask(cli, tmpdir, datafiles):
-
- def create_test_file(*path, mode=0o644, content='content\n'):
+ def create_test_file(*path, mode=0o644, content="content\n"):
path = os.path.join(*path)
os.makedirs(os.path.dirname(path), exist_ok=True)
- with open(path, 'w') as f:
+ with open(path, "w") as f:
f.write(content)
os.fchmod(f.fileno(), mode)
def create_test_directory(*path, mode=0o644):
- create_test_file(*path, '.keep', content='')
+ create_test_file(*path, ".keep", content="")
path = os.path.join(*path)
os.chmod(path, mode)
project = str(datafiles)
- element_name = 'list.bst'
- element_path = os.path.join(project, 'elements', element_name)
- sourcedir = os.path.join(project, 'source')
-
- create_test_file(sourcedir, 'a.txt', mode=0o700)
- create_test_file(sourcedir, 'b.txt', mode=0o755)
- create_test_file(sourcedir, 'c.txt', mode=0o600)
- create_test_file(sourcedir, 'd.txt', mode=0o400)
- create_test_file(sourcedir, 'e.txt', mode=0o644)
- create_test_file(sourcedir, 'f.txt', mode=0o4755)
- create_test_file(sourcedir, 'g.txt', mode=0o2755)
- create_test_file(sourcedir, 'h.txt', mode=0o1755)
- create_test_directory(sourcedir, 'dir-a', mode=0o0700)
- create_test_directory(sourcedir, 'dir-c', mode=0o0755)
- create_test_directory(sourcedir, 'dir-d', mode=0o4755)
- create_test_directory(sourcedir, 'dir-e', mode=0o2755)
- create_test_directory(sourcedir, 'dir-f', mode=0o1755)
-
- source = {'kind': 'local',
- 'path': 'source'}
+ element_name = "list.bst"
+ element_path = os.path.join(project, "elements", element_name)
+ sourcedir = os.path.join(project, "source")
+
+ create_test_file(sourcedir, "a.txt", mode=0o700)
+ create_test_file(sourcedir, "b.txt", mode=0o755)
+ create_test_file(sourcedir, "c.txt", mode=0o600)
+ create_test_file(sourcedir, "d.txt", mode=0o400)
+ create_test_file(sourcedir, "e.txt", mode=0o644)
+ create_test_file(sourcedir, "f.txt", mode=0o4755)
+ create_test_file(sourcedir, "g.txt", mode=0o2755)
+ create_test_file(sourcedir, "h.txt", mode=0o1755)
+ create_test_directory(sourcedir, "dir-a", mode=0o0700)
+ create_test_directory(sourcedir, "dir-c", mode=0o0755)
+ create_test_directory(sourcedir, "dir-d", mode=0o4755)
+ create_test_directory(sourcedir, "dir-e", mode=0o2755)
+ create_test_directory(sourcedir, "dir-f", mode=0o1755)
+
+ source = {"kind": "local", "path": "source"}
element = {
- 'kind': 'manual',
- 'depends': [
- {
- 'filename': 'base.bst',
- 'type': 'build'
- }
- ],
- 'sources': [
- source
- ],
- 'config': {
- 'install-commands': [
- 'ls -l >"%{install-root}/ls-l"'
- ]
- }
+ "kind": "manual",
+ "depends": [{"filename": "base.bst", "type": "build"}],
+ "sources": [source],
+ "config": {"install-commands": ['ls -l >"%{install-root}/ls-l"']},
}
_yaml.roundtrip_dump(element, element_path)
diff --git a/tests/sources/no-fetch-cached/plugins/sources/always_cached.py b/tests/sources/no-fetch-cached/plugins/sources/always_cached.py
index fa143a020..623ab19ab 100644
--- a/tests/sources/no-fetch-cached/plugins/sources/always_cached.py
+++ b/tests/sources/no-fetch-cached/plugins/sources/always_cached.py
@@ -11,7 +11,6 @@ from buildstream import Consistency, Source
class AlwaysCachedSource(Source):
-
def configure(self, node):
pass
diff --git a/tests/sources/no_fetch_cached.py b/tests/sources/no_fetch_cached.py
index 81032881c..1ee3dd7bd 100644
--- a/tests/sources/no_fetch_cached.py
+++ b/tests/sources/no_fetch_cached.py
@@ -10,10 +10,7 @@ from buildstream.testing import cli # pylint: disable=unused-import
from buildstream.testing import create_repo
from buildstream.testing._utils.site import HAVE_GIT
-DATA_DIR = os.path.join(
- os.path.dirname(os.path.realpath(__file__)),
- 'no-fetch-cached'
-)
+DATA_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "no-fetch-cached")
##################################################################
@@ -26,23 +23,13 @@ def test_no_fetch_cached(cli, tmpdir, datafiles):
project = str(datafiles)
# Create the repo from 'files' subdir
- repo = create_repo('git', str(tmpdir))
- ref = repo.create(os.path.join(project, 'files'))
+ repo = create_repo("git", str(tmpdir))
+ ref = repo.create(os.path.join(project, "files"))
# Write out test target with a cached and a non-cached source
- element = {
- 'kind': 'import',
- 'sources': [
- repo.source_config(ref=ref),
- {
- 'kind': 'always_cached'
- }
- ]
- }
- _yaml.roundtrip_dump(element, os.path.join(project, 'target.bst'))
+ element = {"kind": "import", "sources": [repo.source_config(ref=ref), {"kind": "always_cached"}]}
+ _yaml.roundtrip_dump(element, os.path.join(project, "target.bst"))
# Test fetch of target with a cached and a non-cached source
- result = cli.run(project=project, args=[
- 'source', 'fetch', 'target.bst'
- ])
+ result = cli.run(project=project, args=["source", "fetch", "target.bst"])
result.assert_success()
diff --git a/tests/sources/patch.py b/tests/sources/patch.py
index 4f9db815b..2b80a7055 100644
--- a/tests/sources/patch.py
+++ b/tests/sources/patch.py
@@ -8,148 +8,137 @@ from buildstream._exceptions import ErrorDomain, LoadErrorReason
from buildstream.testing import cli # pylint: disable=unused-import
from tests.testutils import filetypegenerator
-DATA_DIR = os.path.join(
- os.path.dirname(os.path.realpath(__file__)),
- 'patch',
-)
+DATA_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "patch",)
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'basic'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "basic"))
def test_missing_patch(cli, datafiles):
project = str(datafiles)
# Removing the local file causes preflight to fail
- localfile = os.path.join(project, 'file_1.patch')
+ localfile = os.path.join(project, "file_1.patch")
os.remove(localfile)
- result = cli.run(project=project, args=[
- 'show', 'target.bst'
- ])
+ result = cli.run(project=project, args=["show", "target.bst"])
result.assert_main_error(ErrorDomain.LOAD, LoadErrorReason.MISSING_FILE)
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'basic'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "basic"))
def test_non_regular_file_patch(cli, datafiles):
project = str(datafiles)
- patch_path = os.path.join(project, 'irregular_file.patch')
+ patch_path = os.path.join(project, "irregular_file.patch")
for _file_type in filetypegenerator.generate_file_types(patch_path):
- result = cli.run(project=project, args=[
- 'show', 'irregular.bst'
- ])
+ result = cli.run(project=project, args=["show", "irregular.bst"])
if os.path.isfile(patch_path) and not os.path.islink(patch_path):
result.assert_success()
else:
- result.assert_main_error(ErrorDomain.LOAD,
- LoadErrorReason.PROJ_PATH_INVALID_KIND)
+ result.assert_main_error(ErrorDomain.LOAD, LoadErrorReason.PROJ_PATH_INVALID_KIND)
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'basic'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "basic"))
def test_invalid_absolute_path(cli, datafiles):
project = str(datafiles)
- with open(os.path.join(project, "target.bst"), 'r') as f:
+ with open(os.path.join(project, "target.bst"), "r") as f:
old_yaml = f.read()
- new_yaml = old_yaml.replace("file_1.patch",
- os.path.join(project, "file_1.patch"))
+ new_yaml = old_yaml.replace("file_1.patch", os.path.join(project, "file_1.patch"))
assert old_yaml != new_yaml
- with open(os.path.join(project, "target.bst"), 'w') as f:
+ with open(os.path.join(project, "target.bst"), "w") as f:
f.write(new_yaml)
- result = cli.run(project=project, args=['show', 'target.bst'])
- result.assert_main_error(ErrorDomain.LOAD,
- LoadErrorReason.PROJ_PATH_INVALID)
+ result = cli.run(project=project, args=["show", "target.bst"])
+ result.assert_main_error(ErrorDomain.LOAD, LoadErrorReason.PROJ_PATH_INVALID)
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'invalid-relative-path'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "invalid-relative-path"))
def test_invalid_relative_path(cli, datafiles):
project = str(datafiles)
- result = cli.run(project=project, args=['show', 'irregular.bst'])
- result.assert_main_error(ErrorDomain.LOAD,
- LoadErrorReason.PROJ_PATH_INVALID)
+ result = cli.run(project=project, args=["show", "irregular.bst"])
+ result.assert_main_error(ErrorDomain.LOAD, LoadErrorReason.PROJ_PATH_INVALID)
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'basic'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "basic"))
def test_stage_and_patch(cli, tmpdir, datafiles):
project = str(datafiles)
checkoutdir = os.path.join(str(tmpdir), "checkout")
# Build, checkout
- result = cli.run(project=project, args=['build', 'target.bst'])
+ result = cli.run(project=project, args=["build", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['artifact', 'checkout', 'target.bst', '--directory', checkoutdir])
+ result = cli.run(project=project, args=["artifact", "checkout", "target.bst", "--directory", checkoutdir])
result.assert_success()
# Test the file.txt was patched and changed
- with open(os.path.join(checkoutdir, 'file.txt')) as f:
- assert f.read() == 'This is text file with superpowers\n'
+ with open(os.path.join(checkoutdir, "file.txt")) as f:
+ assert f.read() == "This is text file with superpowers\n"
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'basic'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "basic"))
def test_stage_file_nonexistent_dir(cli, datafiles):
project = str(datafiles)
# Fails at build time because it tries to patch into a non-existing directory
- result = cli.run(project=project, args=['build', 'failure-nonexistent-dir.bst'])
+ result = cli.run(project=project, args=["build", "failure-nonexistent-dir.bst"])
result.assert_main_error(ErrorDomain.STREAM, None)
result.assert_task_error(ErrorDomain.SOURCE, "patch-no-files")
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'basic'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "basic"))
def test_stage_file_empty_dir(cli, datafiles):
project = str(datafiles)
# Fails at build time because it tries to patch with nothing else staged
- result = cli.run(project=project, args=['build', 'failure-empty-dir.bst'])
+ result = cli.run(project=project, args=["build", "failure-empty-dir.bst"])
result.assert_main_error(ErrorDomain.STREAM, None)
result.assert_task_error(ErrorDomain.SOURCE, "patch-no-files")
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'separate-patch-dir'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "separate-patch-dir"))
def test_stage_separate_patch_dir(cli, tmpdir, datafiles):
project = str(datafiles)
checkoutdir = os.path.join(str(tmpdir), "checkout")
# Track, fetch, build, checkout
- result = cli.run(project=project, args=['build', 'target.bst'])
+ result = cli.run(project=project, args=["build", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['artifact', 'checkout', 'target.bst', '--directory', checkoutdir])
+ result = cli.run(project=project, args=["artifact", "checkout", "target.bst", "--directory", checkoutdir])
result.assert_success()
# Test the file.txt was patched and changed
- with open(os.path.join(checkoutdir, 'test-dir', 'file.txt')) as f:
- assert f.read() == 'This is text file in a directory with superpowers\n'
+ with open(os.path.join(checkoutdir, "test-dir", "file.txt")) as f:
+ assert f.read() == "This is text file in a directory with superpowers\n"
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'multiple-patches'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "multiple-patches"))
def test_stage_multiple_patches(cli, tmpdir, datafiles):
project = str(datafiles)
checkoutdir = os.path.join(str(tmpdir), "checkout")
# Track, fetch, build, checkout
- result = cli.run(project=project, args=['build', 'target.bst'])
+ result = cli.run(project=project, args=["build", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['artifact', 'checkout', 'target.bst', '--directory', checkoutdir])
+ result = cli.run(project=project, args=["artifact", "checkout", "target.bst", "--directory", checkoutdir])
result.assert_success()
# Test the file.txt was patched and changed
- with open(os.path.join(checkoutdir, 'file.txt')) as f:
- assert f.read() == 'This is text file with more superpowers\n'
+ with open(os.path.join(checkoutdir, "file.txt")) as f:
+ assert f.read() == "This is text file with more superpowers\n"
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'different-strip-level'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "different-strip-level"))
def test_patch_strip_level(cli, tmpdir, datafiles):
project = str(datafiles)
checkoutdir = os.path.join(str(tmpdir), "checkout")
# Track, fetch, build, checkout
- result = cli.run(project=project, args=['build', 'target.bst'])
+ result = cli.run(project=project, args=["build", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['artifact', 'checkout', 'target.bst', '--directory', checkoutdir])
+ result = cli.run(project=project, args=["artifact", "checkout", "target.bst", "--directory", checkoutdir])
result.assert_success()
# Test the file.txt was patched and changed
- with open(os.path.join(checkoutdir, 'file.txt')) as f:
- assert f.read() == 'This is text file with superpowers\n'
+ with open(os.path.join(checkoutdir, "file.txt")) as f:
+ assert f.read() == "This is text file with superpowers\n"
diff --git a/tests/sources/pip.py b/tests/sources/pip.py
index 7f91ba701..aafdfaf1c 100644
--- a/tests/sources/pip.py
+++ b/tests/sources/pip.py
@@ -9,62 +9,56 @@ from buildstream import _yaml
from buildstream.plugins.sources.pip import _match_package_name
from buildstream.testing import cli # pylint: disable=unused-import
-DATA_DIR = os.path.join(
- os.path.dirname(os.path.realpath(__file__)),
- 'pip',
-)
+DATA_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "pip",)
def generate_project(project_dir):
project_file = os.path.join(project_dir, "project.conf")
- _yaml.roundtrip_dump({'name': 'foo'}, project_file)
+ _yaml.roundtrip_dump({"name": "foo"}, project_file)
# Test that without ref, consistency is set appropriately.
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'no-ref'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "no-ref"))
def test_no_ref(cli, datafiles):
project = str(datafiles)
generate_project(project)
- assert cli.get_element_state(project, 'target.bst') == 'no reference'
+ assert cli.get_element_state(project, "target.bst") == "no reference"
# Test that pip is not allowed to be the first source
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'first-source-pip'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "first-source-pip"))
def test_first_source(cli, datafiles):
project = str(datafiles)
generate_project(project)
- result = cli.run(project=project, args=[
- 'show', 'target.bst'
- ])
+ result = cli.run(project=project, args=["show", "target.bst"])
result.assert_main_error(ErrorDomain.ELEMENT, None)
# Test that error is raised when neither packges nor requirements files
# have been specified
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'no-packages'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "no-packages"))
def test_no_packages(cli, datafiles):
project = str(datafiles)
generate_project(project)
- result = cli.run(project=project, args=[
- 'show', 'target.bst'
- ])
+ result = cli.run(project=project, args=["show", "target.bst"])
result.assert_main_error(ErrorDomain.SOURCE, None)
# Test that pip source parses tar ball names correctly for the ref
@pytest.mark.parametrize(
- 'tarball, expected_name, expected_version',
+ "tarball, expected_name, expected_version",
[
- ('dotted.package-0.9.8.tar.gz', 'dotted.package', '0.9.8'),
- ('hyphenated-package-2.6.0.tar.gz', 'hyphenated-package', '2.6.0'),
- ('underscore_pkg-3.1.0.tar.gz', 'underscore_pkg', '3.1.0'),
- ('numbers2and5-1.0.1.tar.gz', 'numbers2and5', '1.0.1'),
- ('multiple.dots.package-5.6.7.tar.gz', 'multiple.dots.package', '5.6.7'),
- ('multiple-hyphens-package-1.2.3.tar.gz', 'multiple-hyphens-package', '1.2.3'),
- ('multiple_underscore_pkg-3.4.5.tar.gz', 'multiple_underscore_pkg', '3.4.5'),
- ('shortversion-1.0.tar.gz', 'shortversion', '1.0'),
- ('longversion-1.2.3.4.tar.gz', 'longversion', '1.2.3.4')
- ])
+ ("dotted.package-0.9.8.tar.gz", "dotted.package", "0.9.8"),
+ ("hyphenated-package-2.6.0.tar.gz", "hyphenated-package", "2.6.0"),
+ ("underscore_pkg-3.1.0.tar.gz", "underscore_pkg", "3.1.0"),
+ ("numbers2and5-1.0.1.tar.gz", "numbers2and5", "1.0.1"),
+ ("multiple.dots.package-5.6.7.tar.gz", "multiple.dots.package", "5.6.7"),
+ ("multiple-hyphens-package-1.2.3.tar.gz", "multiple-hyphens-package", "1.2.3"),
+ ("multiple_underscore_pkg-3.4.5.tar.gz", "multiple_underscore_pkg", "3.4.5"),
+ ("shortversion-1.0.tar.gz", "shortversion", "1.0"),
+ ("longversion-1.2.3.4.tar.gz", "longversion", "1.2.3.4"),
+ ],
+)
def test_match_package_name(tarball, expected_name, expected_version):
name, version = _match_package_name(tarball)
assert (expected_name, expected_version) == (name, version)
diff --git a/tests/sources/previous_source_access.py b/tests/sources/previous_source_access.py
index 750b94381..fadf6710c 100644
--- a/tests/sources/previous_source_access.py
+++ b/tests/sources/previous_source_access.py
@@ -7,10 +7,7 @@ import pytest
from buildstream import _yaml
from buildstream.testing import cli # pylint: disable=unused-import
-DATA_DIR = os.path.join(
- os.path.dirname(os.path.realpath(__file__)),
- 'previous_source_access'
-)
+DATA_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "previous_source_access")
##################################################################
@@ -29,30 +26,22 @@ def test_custom_transform_source(cli, datafiles):
_yaml.roundtrip_dump(project_config, project_config_path)
# Ensure we can track
- result = cli.run(project=project, args=[
- 'source', 'track', 'target.bst'
- ])
+ result = cli.run(project=project, args=["source", "track", "target.bst"])
result.assert_success()
# Ensure we can fetch
- result = cli.run(project=project, args=[
- 'source', 'fetch', 'target.bst'
- ])
+ result = cli.run(project=project, args=["source", "fetch", "target.bst"])
result.assert_success()
# Ensure we get correct output from foo_transform
- cli.run(project=project, args=[
- 'build', 'target.bst'
- ])
- destpath = os.path.join(cli.directory, 'checkout')
- result = cli.run(project=project, args=[
- 'artifact', 'checkout', 'target.bst', '--directory', destpath
- ])
+ cli.run(project=project, args=["build", "target.bst"])
+ destpath = os.path.join(cli.directory, "checkout")
+ result = cli.run(project=project, args=["artifact", "checkout", "target.bst", "--directory", destpath])
result.assert_success()
# Assert that files from both sources exist, and that they have
# the same content
- assert os.path.exists(os.path.join(destpath, 'file'))
- assert os.path.exists(os.path.join(destpath, 'filetransform'))
- with open(os.path.join(destpath, 'file')) as file1:
- with open(os.path.join(destpath, 'filetransform')) as file2:
+ assert os.path.exists(os.path.join(destpath, "file"))
+ assert os.path.exists(os.path.join(destpath, "filetransform"))
+ with open(os.path.join(destpath, "file")) as file1:
+ with open(os.path.join(destpath, "filetransform")) as file2:
assert file1.read() == file2.read()
diff --git a/tests/sources/previous_source_access/plugins/sources/foo_transform.py b/tests/sources/previous_source_access/plugins/sources/foo_transform.py
index 4b423a1b3..906a8f6be 100644
--- a/tests/sources/previous_source_access/plugins/sources/foo_transform.py
+++ b/tests/sources/previous_source_access/plugins/sources/foo_transform.py
@@ -25,14 +25,13 @@ class FooTransformSource(Source):
"""Directory where this source should stage its files
"""
- path = os.path.join(self.get_mirror_directory(), self.name,
- self.ref.strip())
+ path = os.path.join(self.get_mirror_directory(), self.name, self.ref.strip())
os.makedirs(path, exist_ok=True)
return path
def configure(self, node):
- node.validate_keys(['ref', *Source.COMMON_CONFIG_KEYS])
- self.ref = node.get_str('ref', None)
+ node.validate_keys(["ref", *Source.COMMON_CONFIG_KEYS])
+ self.ref = node.get_str("ref", None)
def preflight(self):
pass
@@ -45,9 +44,9 @@ class FooTransformSource(Source):
return Consistency.INCONSISTENT
# If we have a file called "filetransform", verify that its checksum
# matches our ref. Otherwise, it resolved but not cached.
- fpath = os.path.join(self.mirror, 'filetransform')
+ fpath = os.path.join(self.mirror, "filetransform")
try:
- with open(fpath, 'rb') as f:
+ with open(fpath, "rb") as f:
if hashlib.sha256(f.read()).hexdigest() == self.ref.strip():
return Consistency.CACHED
except Exception:
@@ -58,30 +57,29 @@ class FooTransformSource(Source):
return self.ref
def set_ref(self, ref, node):
- self.ref = node['ref'] = ref
+ self.ref = node["ref"] = ref
def track(self, previous_sources_dir):
# Store the checksum of the file from previous source as our ref
- fpath = os.path.join(previous_sources_dir, 'file')
- with open(fpath, 'rb') as f:
+ fpath = os.path.join(previous_sources_dir, "file")
+ with open(fpath, "rb") as f:
return hashlib.sha256(f.read()).hexdigest()
def fetch(self, previous_sources_dir):
- fpath = os.path.join(previous_sources_dir, 'file')
+ fpath = os.path.join(previous_sources_dir, "file")
# Verify that the checksum of the file from previous source matches
# our ref
- with open(fpath, 'rb') as f:
+ with open(fpath, "rb") as f:
if hashlib.sha256(f.read()).hexdigest() != self.ref.strip():
raise SourceError("Element references do not match")
# Copy "file" as "filetransform"
- newfpath = os.path.join(self.mirror, 'filetransform')
+ newfpath = os.path.join(self.mirror, "filetransform")
utils.safe_copy(fpath, newfpath)
def stage(self, directory):
# Simply stage the "filetransform" file
- utils.safe_copy(os.path.join(self.mirror, 'filetransform'),
- os.path.join(directory, 'filetransform'))
+ utils.safe_copy(os.path.join(self.mirror, "filetransform"), os.path.join(directory, "filetransform"))
def setup():
diff --git a/tests/sources/remote.py b/tests/sources/remote.py
index 5b818b960..76e469c60 100644
--- a/tests/sources/remote.py
+++ b/tests/sources/remote.py
@@ -10,94 +10,71 @@ from buildstream import _yaml
from buildstream.testing import cli # pylint: disable=unused-import
from tests.testutils.file_server import create_file_server
-DATA_DIR = os.path.join(
- os.path.dirname(os.path.realpath(__file__)),
- 'remote',
-)
+DATA_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "remote",)
def generate_project(project_dir, tmpdir):
project_file = os.path.join(project_dir, "project.conf")
- _yaml.roundtrip_dump({
- 'name': 'foo',
- 'aliases': {
- 'tmpdir': "file:///" + str(tmpdir)
- }
- }, project_file)
+ _yaml.roundtrip_dump({"name": "foo", "aliases": {"tmpdir": "file:///" + str(tmpdir)}}, project_file)
def generate_project_file_server(server, project_dir):
project_file = os.path.join(project_dir, "project.conf")
- _yaml.roundtrip_dump({
- 'name': 'foo',
- 'aliases': {
- 'tmpdir': server.base_url()
- }
- }, project_file)
+ _yaml.roundtrip_dump({"name": "foo", "aliases": {"tmpdir": server.base_url()}}, project_file)
# Test that without ref, consistency is set appropriately.
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'no-ref'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "no-ref"))
def test_no_ref(cli, tmpdir, datafiles):
project = str(datafiles)
generate_project(project, tmpdir)
- assert cli.get_element_state(project, 'target.bst') == 'no reference'
+ assert cli.get_element_state(project, "target.bst") == "no reference"
# Here we are doing a fetch on a file that doesn't exist. target.bst
# refers to 'file' but that file is not present.
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'missing-file'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "missing-file"))
def test_missing_file(cli, tmpdir, datafiles):
project = str(datafiles)
generate_project(project, tmpdir)
# Try to fetch it
- result = cli.run(project=project, args=[
- 'source', 'fetch', 'target.bst'
- ])
+ result = cli.run(project=project, args=["source", "fetch", "target.bst"])
result.assert_main_error(ErrorDomain.STREAM, None)
result.assert_task_error(ErrorDomain.SOURCE, None)
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'path-in-filename'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "path-in-filename"))
def test_path_in_filename(cli, tmpdir, datafiles):
project = str(datafiles)
generate_project(project, tmpdir)
# Try to fetch it
- result = cli.run(project=project, args=[
- 'source', 'fetch', 'target.bst'
- ])
+ result = cli.run(project=project, args=["source", "fetch", "target.bst"])
# The bst file has a / in the filename param
result.assert_main_error(ErrorDomain.SOURCE, "filename-contains-directory")
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'single-file'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "single-file"))
def test_simple_file_build(cli, tmpdir, datafiles):
project = str(datafiles)
generate_project(project, tmpdir)
checkoutdir = os.path.join(str(tmpdir), "checkout")
# Try to fetch it
- result = cli.run(project=project, args=[
- 'source', 'fetch', 'target.bst'
- ])
+ result = cli.run(project=project, args=["source", "fetch", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=[
- 'build', 'target.bst'
- ])
+ result = cli.run(project=project, args=["build", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=[
- 'artifact', 'checkout', 'target.bst', '--directory', checkoutdir
- ])
+ result = cli.run(project=project, args=["artifact", "checkout", "target.bst", "--directory", checkoutdir])
result.assert_success()
# Note that the url of the file in target.bst is actually /dir/file
# but this tests confirms we take the basename
- checkout_file = os.path.join(checkoutdir, 'file')
+ checkout_file = os.path.join(checkoutdir, "file")
assert os.path.exists(checkout_file)
mode = os.stat(checkout_file).st_mode
@@ -107,113 +84,99 @@ def test_simple_file_build(cli, tmpdir, datafiles):
assert not mode & (stat.S_IWGRP | stat.S_IWOTH)
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'single-file-custom-name'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "single-file-custom-name"))
def test_simple_file_custom_name_build(cli, tmpdir, datafiles):
project = str(datafiles)
generate_project(project, tmpdir)
checkoutdir = os.path.join(str(tmpdir), "checkout")
# Try to fetch it
- result = cli.run(project=project, args=[
- 'source', 'fetch', 'target.bst'
- ])
+ result = cli.run(project=project, args=["source", "fetch", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=[
- 'build', 'target.bst'
- ])
+ result = cli.run(project=project, args=["build", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=[
- 'artifact', 'checkout', 'target.bst', '--directory', checkoutdir
- ])
+ result = cli.run(project=project, args=["artifact", "checkout", "target.bst", "--directory", checkoutdir])
result.assert_success()
- assert not os.path.exists(os.path.join(checkoutdir, 'file'))
- assert os.path.exists(os.path.join(checkoutdir, 'custom-file'))
+ assert not os.path.exists(os.path.join(checkoutdir, "file"))
+ assert os.path.exists(os.path.join(checkoutdir, "custom-file"))
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'unique-keys'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "unique-keys"))
def test_unique_key(cli, tmpdir, datafiles):
- '''This test confirms that the 'filename' parameter is honoured when it comes
+ """This test confirms that the 'filename' parameter is honoured when it comes
to generating a cache key for the source.
- '''
+ """
project = str(datafiles)
generate_project(project, tmpdir)
- states = cli.get_element_states(project, [
- 'target.bst', 'target-custom.bst', 'target-custom-executable.bst'
- ])
- assert states['target.bst'] == "fetch needed"
- assert states['target-custom.bst'] == "fetch needed"
- assert states['target-custom-executable.bst'] == "fetch needed"
+ states = cli.get_element_states(project, ["target.bst", "target-custom.bst", "target-custom-executable.bst"])
+ assert states["target.bst"] == "fetch needed"
+ assert states["target-custom.bst"] == "fetch needed"
+ assert states["target-custom-executable.bst"] == "fetch needed"
# Try to fetch it
- cli.run(project=project, args=[
- 'source', 'fetch', 'target.bst'
- ])
+ cli.run(project=project, args=["source", "fetch", "target.bst"])
# We should download the file only once
- states = cli.get_element_states(project, [
- 'target.bst', 'target-custom.bst', 'target-custom-executable.bst'
- ])
- assert states['target.bst'] == 'buildable'
- assert states['target-custom.bst'] == 'buildable'
- assert states['target-custom-executable.bst'] == 'buildable'
+ states = cli.get_element_states(project, ["target.bst", "target-custom.bst", "target-custom-executable.bst"])
+ assert states["target.bst"] == "buildable"
+ assert states["target-custom.bst"] == "buildable"
+ assert states["target-custom-executable.bst"] == "buildable"
# But the cache key is different because the 'filename' is different.
- assert cli.get_element_key(project, 'target.bst') != \
- cli.get_element_key(project, 'target-custom.bst') != \
- cli.get_element_key(project, 'target-custom-executable.bst')
+ assert (
+ cli.get_element_key(project, "target.bst")
+ != cli.get_element_key(project, "target-custom.bst")
+ != cli.get_element_key(project, "target-custom-executable.bst")
+ )
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'unique-keys'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "unique-keys"))
def test_executable(cli, tmpdir, datafiles):
- '''This test confirms that the 'ecxecutable' parameter is honoured.
- '''
+ """This test confirms that the 'ecxecutable' parameter is honoured.
+ """
project = str(datafiles)
generate_project(project, tmpdir)
checkoutdir = os.path.join(str(tmpdir), "checkout")
- assert cli.get_element_state(project, 'target-custom-executable.bst') == "fetch needed"
+ assert cli.get_element_state(project, "target-custom-executable.bst") == "fetch needed"
# Try to fetch it
- cli.run(project=project, args=[
- 'build', 'target-custom-executable.bst'
- ])
-
- cli.run(project=project, args=[
- 'artifact', 'checkout', 'target-custom-executable.bst', '--directory', checkoutdir
- ])
- mode = os.stat(os.path.join(checkoutdir, 'some-custom-file')).st_mode
+ cli.run(project=project, args=["build", "target-custom-executable.bst"])
+
+ cli.run(project=project, args=["artifact", "checkout", "target-custom-executable.bst", "--directory", checkoutdir])
+ mode = os.stat(os.path.join(checkoutdir, "some-custom-file")).st_mode
assert mode & stat.S_IEXEC
# Assert executable by anyone
assert mode & (stat.S_IEXEC | stat.S_IXGRP | stat.S_IXOTH)
-@pytest.mark.parametrize('server_type', ('FTP', 'HTTP'))
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'single-file'))
+@pytest.mark.parametrize("server_type", ("FTP", "HTTP"))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "single-file"))
def test_use_netrc(cli, datafiles, server_type, tmpdir):
- fake_home = os.path.join(str(tmpdir), 'fake_home')
+ fake_home = os.path.join(str(tmpdir), "fake_home")
os.makedirs(fake_home, exist_ok=True)
project = str(datafiles)
- checkoutdir = os.path.join(str(tmpdir), 'checkout')
+ checkoutdir = os.path.join(str(tmpdir), "checkout")
- os.environ['HOME'] = fake_home
- with open(os.path.join(fake_home, '.netrc'), 'wb') as f:
+ os.environ["HOME"] = fake_home
+ with open(os.path.join(fake_home, ".netrc"), "wb") as f:
os.fchmod(f.fileno(), 0o700)
- f.write(b'machine 127.0.0.1\n')
- f.write(b'login testuser\n')
- f.write(b'password 12345\n')
+ f.write(b"machine 127.0.0.1\n")
+ f.write(b"login testuser\n")
+ f.write(b"password 12345\n")
with create_file_server(server_type) as server:
- server.add_user('testuser', '12345', project)
+ server.add_user("testuser", "12345", project)
generate_project_file_server(server, project)
server.start()
- result = cli.run(project=project, args=['source', 'fetch', 'target.bst'])
+ result = cli.run(project=project, args=["source", "fetch", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['build', 'target.bst'])
+ result = cli.run(project=project, args=["build", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['artifact', 'checkout', 'target.bst', '--directory', checkoutdir])
+ result = cli.run(project=project, args=["artifact", "checkout", "target.bst", "--directory", checkoutdir])
result.assert_success()
- checkout_file = os.path.join(checkoutdir, 'file')
+ checkout_file = os.path.join(checkoutdir, "file")
assert os.path.exists(checkout_file)
diff --git a/tests/sources/tar.py b/tests/sources/tar.py
index fac6f3f8b..f5f3dde2d 100644
--- a/tests/sources/tar.py
+++ b/tests/sources/tar.py
@@ -17,10 +17,7 @@ from buildstream.testing._utils.site import HAVE_LZIP
from tests.testutils.file_server import create_file_server
from . import list_dir_contents
-DATA_DIR = os.path.join(
- os.path.dirname(os.path.realpath(__file__)),
- 'tar',
-)
+DATA_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "tar",)
def _assemble_tar(workingdir, srcdir, dstfile):
@@ -38,58 +35,44 @@ def _assemble_tar_lz(workingdir, srcdir, dstfile):
with tarfile.open(fileobj=uncompressed, mode="w:") as tar:
tar.add(srcdir)
uncompressed.seek(0, 0)
- with open(dstfile, 'wb') as dst:
- subprocess.call(['lzip'],
- stdin=uncompressed,
- stdout=dst)
+ with open(dstfile, "wb") as dst:
+ subprocess.call(["lzip"], stdin=uncompressed, stdout=dst)
os.chdir(old_dir)
def generate_project(project_dir, tmpdir):
project_file = os.path.join(project_dir, "project.conf")
- _yaml.roundtrip_dump({
- 'name': 'foo',
- 'aliases': {
- 'tmpdir': "file:///" + str(tmpdir)
- }
- }, project_file)
+ _yaml.roundtrip_dump({"name": "foo", "aliases": {"tmpdir": "file:///" + str(tmpdir)}}, project_file)
def generate_project_file_server(base_url, project_dir):
project_file = os.path.join(project_dir, "project.conf")
- _yaml.roundtrip_dump({
- 'name': 'foo',
- 'aliases': {
- 'tmpdir': base_url
- }
- }, project_file)
+ _yaml.roundtrip_dump({"name": "foo", "aliases": {"tmpdir": base_url}}, project_file)
# Test that without ref, consistency is set appropriately.
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'no-ref'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "no-ref"))
def test_no_ref(cli, tmpdir, datafiles):
project = str(datafiles)
generate_project(project, tmpdir)
- assert cli.get_element_state(project, 'target.bst') == 'no reference'
+ assert cli.get_element_state(project, "target.bst") == "no reference"
# Test that when I fetch a nonexistent URL, errors are handled gracefully and a retry is performed.
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'fetch'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "fetch"))
def test_fetch_bad_url(cli, tmpdir, datafiles):
project = str(datafiles)
generate_project(project, tmpdir)
# Try to fetch it
- result = cli.run(project=project, args=[
- 'source', 'fetch', 'target.bst'
- ])
+ result = cli.run(project=project, args=["source", "fetch", "target.bst"])
assert "FAILURE Try #" in result.stderr
result.assert_main_error(ErrorDomain.STREAM, None)
result.assert_task_error(ErrorDomain.SOURCE, None)
# Test that when I fetch with an invalid ref, it fails.
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'fetch'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "fetch"))
def test_fetch_bad_ref(cli, tmpdir, datafiles):
project = str(datafiles)
generate_project(project, tmpdir)
@@ -99,15 +82,13 @@ def test_fetch_bad_ref(cli, tmpdir, datafiles):
_assemble_tar(os.path.join(str(datafiles), "content"), "a", src_tar)
# Try to fetch it
- result = cli.run(project=project, args=[
- 'source', 'fetch', 'target.bst'
- ])
+ result = cli.run(project=project, args=["source", "fetch", "target.bst"])
result.assert_main_error(ErrorDomain.STREAM, None)
result.assert_task_error(ErrorDomain.SOURCE, None)
# Test that when tracking with a ref set, there is a warning
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'fetch'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "fetch"))
def test_track_warning(cli, tmpdir, datafiles):
project = str(datafiles)
generate_project(project, tmpdir)
@@ -117,15 +98,13 @@ def test_track_warning(cli, tmpdir, datafiles):
_assemble_tar(os.path.join(str(datafiles), "content"), "a", src_tar)
# Track it
- result = cli.run(project=project, args=[
- 'source', 'track', 'target.bst'
- ])
+ result = cli.run(project=project, args=["source", "track", "target.bst"])
result.assert_success()
assert "Potential man-in-the-middle attack!" in result.stderr
# Test that a staged checkout matches what was tarred up, with the default first subdir
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'fetch'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "fetch"))
@pytest.mark.parametrize("srcdir", ["a", "./a"])
def test_stage_default_basedir(cli, tmpdir, datafiles, srcdir):
project = str(datafiles)
@@ -137,13 +116,13 @@ def test_stage_default_basedir(cli, tmpdir, datafiles, srcdir):
_assemble_tar(os.path.join(str(datafiles), "content"), srcdir, src_tar)
# Track, fetch, build, checkout
- result = cli.run(project=project, args=['source', 'track', 'target.bst'])
+ result = cli.run(project=project, args=["source", "track", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['source', 'fetch', 'target.bst'])
+ result = cli.run(project=project, args=["source", "fetch", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['build', 'target.bst'])
+ result = cli.run(project=project, args=["build", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['artifact', 'checkout', 'target.bst', '--directory', checkoutdir])
+ result = cli.run(project=project, args=["artifact", "checkout", "target.bst", "--directory", checkoutdir])
result.assert_success()
# Check that the content of the first directory is checked out (base-dir: '*')
@@ -154,7 +133,7 @@ def test_stage_default_basedir(cli, tmpdir, datafiles, srcdir):
# Test that a staged checkout matches what was tarred up, with an empty base-dir
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'no-basedir'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "no-basedir"))
@pytest.mark.parametrize("srcdir", ["a", "./a"])
def test_stage_no_basedir(cli, tmpdir, datafiles, srcdir):
project = str(datafiles)
@@ -166,13 +145,13 @@ def test_stage_no_basedir(cli, tmpdir, datafiles, srcdir):
_assemble_tar(os.path.join(str(datafiles), "content"), srcdir, src_tar)
# Track, fetch, build, checkout
- result = cli.run(project=project, args=['source', 'track', 'target.bst'])
+ result = cli.run(project=project, args=["source", "track", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['source', 'fetch', 'target.bst'])
+ result = cli.run(project=project, args=["source", "fetch", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['build', 'target.bst'])
+ result = cli.run(project=project, args=["build", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['artifact', 'checkout', 'target.bst', '--directory', checkoutdir])
+ result = cli.run(project=project, args=["artifact", "checkout", "target.bst", "--directory", checkoutdir])
result.assert_success()
# Check that the full content of the tarball is checked out (base-dir: '')
@@ -183,7 +162,7 @@ def test_stage_no_basedir(cli, tmpdir, datafiles, srcdir):
# Test that a staged checkout matches what was tarred up, with an explicit basedir
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'explicit-basedir'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "explicit-basedir"))
@pytest.mark.parametrize("srcdir", ["a", "./a"])
def test_stage_explicit_basedir(cli, tmpdir, datafiles, srcdir):
project = str(datafiles)
@@ -195,13 +174,13 @@ def test_stage_explicit_basedir(cli, tmpdir, datafiles, srcdir):
_assemble_tar(os.path.join(str(datafiles), "content"), srcdir, src_tar)
# Track, fetch, build, checkout
- result = cli.run(project=project, args=['source', 'track', 'target.bst'])
+ result = cli.run(project=project, args=["source", "track", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['source', 'fetch', 'target.bst'])
+ result = cli.run(project=project, args=["source", "fetch", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['build', 'target.bst'])
+ result = cli.run(project=project, args=["build", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['artifact', 'checkout', 'target.bst', '--directory', checkoutdir])
+ result = cli.run(project=project, args=["artifact", "checkout", "target.bst", "--directory", checkoutdir])
result.assert_success()
# Check that the content of the first directory is checked out (base-dir: '*')
@@ -213,7 +192,7 @@ def test_stage_explicit_basedir(cli, tmpdir, datafiles, srcdir):
# Test that we succeed to extract tarballs with hardlinks when stripping the
# leading paths
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'contains-links'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "contains-links"))
def test_stage_contains_links(cli, tmpdir, datafiles):
project = str(datafiles)
generate_project(project, tmpdir)
@@ -231,13 +210,13 @@ def test_stage_contains_links(cli, tmpdir, datafiles):
_assemble_tar(os.path.join(str(datafiles), "content"), "base-directory", src_tar)
# Track, fetch, build, checkout
- result = cli.run(project=project, args=['source', 'track', 'target.bst'])
+ result = cli.run(project=project, args=["source", "track", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['source', 'fetch', 'target.bst'])
+ result = cli.run(project=project, args=["source", "fetch", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['build', 'target.bst'])
+ result = cli.run(project=project, args=["build", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['artifact', 'checkout', 'target.bst', '--directory', checkoutdir])
+ result = cli.run(project=project, args=["artifact", "checkout", "target.bst", "--directory", checkoutdir])
result.assert_success()
# Check that the content of the first directory is checked out (base-dir: '*')
@@ -247,8 +226,8 @@ def test_stage_contains_links(cli, tmpdir, datafiles):
assert checkout_contents == original_contents
-@pytest.mark.skipif(not HAVE_LZIP, reason='lzip is not available')
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'fetch'))
+@pytest.mark.skipif(not HAVE_LZIP, reason="lzip is not available")
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "fetch"))
@pytest.mark.parametrize("srcdir", ["a", "./a"])
def test_stage_default_basedir_lzip(cli, tmpdir, datafiles, srcdir):
project = str(datafiles)
@@ -260,13 +239,13 @@ def test_stage_default_basedir_lzip(cli, tmpdir, datafiles, srcdir):
_assemble_tar_lz(os.path.join(str(datafiles), "content"), srcdir, src_tar)
# Track, fetch, build, checkout
- result = cli.run(project=project, args=['source', 'track', 'target-lz.bst'])
+ result = cli.run(project=project, args=["source", "track", "target-lz.bst"])
result.assert_success()
- result = cli.run(project=project, args=['source', 'fetch', 'target-lz.bst'])
+ result = cli.run(project=project, args=["source", "fetch", "target-lz.bst"])
result.assert_success()
- result = cli.run(project=project, args=['build', 'target-lz.bst'])
+ result = cli.run(project=project, args=["build", "target-lz.bst"])
result.assert_success()
- result = cli.run(project=project, args=['artifact', 'checkout', 'target-lz.bst', '--directory', checkoutdir])
+ result = cli.run(project=project, args=["artifact", "checkout", "target-lz.bst", "--directory", checkoutdir])
result.assert_success()
# Check that the content of the first directory is checked out (base-dir: '*')
@@ -280,8 +259,8 @@ def test_stage_default_basedir_lzip(cli, tmpdir, datafiles, srcdir):
# a - contains read-only files in a writable directory
# b - root directory has read-only permission
# c - contains one file that has no read nor write permissions. Base-dir set to '' to extract root of tarball
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'read-only'))
-@pytest.mark.parametrize("tar_name, base_dir", [("a", "*"), ("b", '*'), ("c", '')])
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "read-only"))
+@pytest.mark.parametrize("tar_name, base_dir", [("a", "*"), ("b", "*"), ("c", "")])
def test_read_only_dir(cli, tmpdir, datafiles, tar_name, base_dir):
try:
project = str(datafiles)
@@ -290,25 +269,21 @@ def test_read_only_dir(cli, tmpdir, datafiles, tar_name, base_dir):
bst_path = os.path.join(project, "target.bst")
tar_file = "{}.tar.gz".format(tar_name)
- _yaml.roundtrip_dump({
- 'kind': 'import',
- 'sources': [
- {
- 'kind': 'tar',
- 'url': 'tmpdir:/{}'.format(tar_file),
- 'ref': 'foo',
- 'base-dir': base_dir
- }
- ]
- }, bst_path)
+ _yaml.roundtrip_dump(
+ {
+ "kind": "import",
+ "sources": [{"kind": "tar", "url": "tmpdir:/{}".format(tar_file), "ref": "foo", "base-dir": base_dir}],
+ },
+ bst_path,
+ )
# Get the tarball in tests/sources/tar/read-only/content
#
# NOTE that we need to do this because tarfile.open and tar.add()
# are packing the tar up with writeable files and dirs
- tarball = os.path.join(str(datafiles), 'content', tar_file)
+ tarball = os.path.join(str(datafiles), "content", tar_file)
if not os.path.exists(tarball):
- raise FileNotFoundError('{} does not exist'.format(tarball))
+ raise FileNotFoundError("{} does not exist".format(tarball))
copyfile(tarball, os.path.join(str(tmpdir), tar_file))
# Because this test can potentially leave directories behind
@@ -320,11 +295,11 @@ def test_read_only_dir(cli, tmpdir, datafiles, tar_name, base_dir):
env = {"TMP": tmpdir_str}
# Track, fetch, build, checkout
- result = cli.run(project=project, args=['source', 'track', 'target.bst'], env=env)
+ result = cli.run(project=project, args=["source", "track", "target.bst"], env=env)
result.assert_success()
- result = cli.run(project=project, args=['source', 'fetch', 'target.bst'], env=env)
+ result = cli.run(project=project, args=["source", "fetch", "target.bst"], env=env)
result.assert_success()
- result = cli.run(project=project, args=['build', 'target.bst'], env=env)
+ result = cli.run(project=project, args=["build", "target.bst"], env=env)
result.assert_success()
finally:
@@ -336,85 +311,86 @@ def test_read_only_dir(cli, tmpdir, datafiles, tar_name, base_dir):
os.rmdir(path)
else:
os.remove(path)
+
rmtree(str(tmpdir), onerror=make_dir_writable)
-@pytest.mark.parametrize('server_type', ('FTP', 'HTTP'))
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'fetch'))
+@pytest.mark.parametrize("server_type", ("FTP", "HTTP"))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "fetch"))
def test_use_netrc(cli, datafiles, server_type, tmpdir):
- file_server_files = os.path.join(str(tmpdir), 'file_server')
- fake_home = os.path.join(str(tmpdir), 'fake_home')
+ file_server_files = os.path.join(str(tmpdir), "file_server")
+ fake_home = os.path.join(str(tmpdir), "fake_home")
os.makedirs(file_server_files, exist_ok=True)
os.makedirs(fake_home, exist_ok=True)
project = str(datafiles)
- checkoutdir = os.path.join(str(tmpdir), 'checkout')
+ checkoutdir = os.path.join(str(tmpdir), "checkout")
- os.environ['HOME'] = fake_home
- with open(os.path.join(fake_home, '.netrc'), 'wb') as f:
+ os.environ["HOME"] = fake_home
+ with open(os.path.join(fake_home, ".netrc"), "wb") as f:
os.fchmod(f.fileno(), 0o700)
- f.write(b'machine 127.0.0.1\n')
- f.write(b'login testuser\n')
- f.write(b'password 12345\n')
+ f.write(b"machine 127.0.0.1\n")
+ f.write(b"login testuser\n")
+ f.write(b"password 12345\n")
with create_file_server(server_type) as server:
- server.add_user('testuser', '12345', file_server_files)
+ server.add_user("testuser", "12345", file_server_files)
generate_project_file_server(server.base_url(), project)
- src_tar = os.path.join(file_server_files, 'a.tar.gz')
- _assemble_tar(os.path.join(str(datafiles), 'content'), 'a', src_tar)
+ src_tar = os.path.join(file_server_files, "a.tar.gz")
+ _assemble_tar(os.path.join(str(datafiles), "content"), "a", src_tar)
server.start()
- result = cli.run(project=project, args=['source', 'track', 'target.bst'])
+ result = cli.run(project=project, args=["source", "track", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['source', 'fetch', 'target.bst'])
+ result = cli.run(project=project, args=["source", "fetch", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['build', 'target.bst'])
+ result = cli.run(project=project, args=["build", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['artifact', 'checkout', 'target.bst', '--directory', checkoutdir])
+ result = cli.run(project=project, args=["artifact", "checkout", "target.bst", "--directory", checkoutdir])
result.assert_success()
- original_dir = os.path.join(str(datafiles), 'content', 'a')
+ original_dir = os.path.join(str(datafiles), "content", "a")
original_contents = list_dir_contents(original_dir)
checkout_contents = list_dir_contents(checkoutdir)
assert checkout_contents == original_contents
-@pytest.mark.parametrize('server_type', ('FTP', 'HTTP'))
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'fetch'))
+@pytest.mark.parametrize("server_type", ("FTP", "HTTP"))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "fetch"))
def test_netrc_already_specified_user(cli, datafiles, server_type, tmpdir):
- file_server_files = os.path.join(str(tmpdir), 'file_server')
- fake_home = os.path.join(str(tmpdir), 'fake_home')
+ file_server_files = os.path.join(str(tmpdir), "file_server")
+ fake_home = os.path.join(str(tmpdir), "fake_home")
os.makedirs(file_server_files, exist_ok=True)
os.makedirs(fake_home, exist_ok=True)
project = str(datafiles)
- os.environ['HOME'] = fake_home
- with open(os.path.join(fake_home, '.netrc'), 'wb') as f:
+ os.environ["HOME"] = fake_home
+ with open(os.path.join(fake_home, ".netrc"), "wb") as f:
os.fchmod(f.fileno(), 0o700)
- f.write(b'machine 127.0.0.1\n')
- f.write(b'login testuser\n')
- f.write(b'password 12345\n')
+ f.write(b"machine 127.0.0.1\n")
+ f.write(b"login testuser\n")
+ f.write(b"password 12345\n")
with create_file_server(server_type) as server:
- server.add_user('otheruser', '12345', file_server_files)
+ server.add_user("otheruser", "12345", file_server_files)
parts = urllib.parse.urlsplit(server.base_url())
- base_url = urllib.parse.urlunsplit([parts[0], 'otheruser@{}'.format(parts[1]), *parts[2:]])
+ base_url = urllib.parse.urlunsplit([parts[0], "otheruser@{}".format(parts[1]), *parts[2:]])
generate_project_file_server(base_url, project)
- src_tar = os.path.join(file_server_files, 'a.tar.gz')
- _assemble_tar(os.path.join(str(datafiles), 'content'), 'a', src_tar)
+ src_tar = os.path.join(file_server_files, "a.tar.gz")
+ _assemble_tar(os.path.join(str(datafiles), "content"), "a", src_tar)
server.start()
- result = cli.run(project=project, args=['source', 'track', 'target.bst'])
+ result = cli.run(project=project, args=["source", "track", "target.bst"])
result.assert_main_error(ErrorDomain.STREAM, None)
result.assert_task_error(ErrorDomain.SOURCE, None)
# Test that BuildStream doesnt crash if HOME is unset while
# the netrc module is trying to find it's ~/.netrc file.
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'fetch'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "fetch"))
def test_homeless_environment(cli, tmpdir, datafiles):
project = str(datafiles)
generate_project(project, tmpdir)
@@ -424,11 +400,11 @@ def test_homeless_environment(cli, tmpdir, datafiles):
_assemble_tar(os.path.join(str(datafiles), "content"), "a", src_tar)
# Use a track, make sure the plugin tries to find a ~/.netrc
- result = cli.run(project=project, args=['source', 'track', 'target.bst'], env={'HOME': None})
+ result = cli.run(project=project, args=["source", "track", "target.bst"], env={"HOME": None})
result.assert_success()
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'out-of-basedir-hardlinks'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "out-of-basedir-hardlinks"))
def test_out_of_basedir_hardlinks(cli, tmpdir, datafiles):
def ensure_link(member):
# By default, python will simply duplicate files - we want
@@ -453,28 +429,28 @@ def test_out_of_basedir_hardlinks(cli, tmpdir, datafiles):
# Make sure our tarfile is actually created with the desired
# attributes set
with tarfile.open(src_tar, "r:gz") as tar:
- assert any(member.islnk() and
- member.path == "contents/to_extract/a" and
- member.linkname == "contents/elsewhere/a"
- for member in tar.getmembers())
+ assert any(
+ member.islnk() and member.path == "contents/to_extract/a" and member.linkname == "contents/elsewhere/a"
+ for member in tar.getmembers()
+ )
# Assert that we will actually create a singular copy of the file
- result = cli.run(project=project, args=['source', 'track', 'target.bst'])
+ result = cli.run(project=project, args=["source", "track", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['source', 'fetch', 'target.bst'])
+ result = cli.run(project=project, args=["source", "fetch", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['build', 'target.bst'])
+ result = cli.run(project=project, args=["build", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['artifact', 'checkout', 'target.bst', '--directory', checkoutdir])
+ result = cli.run(project=project, args=["artifact", "checkout", "target.bst", "--directory", checkoutdir])
result.assert_success()
- original_dir = os.path.join(str(datafiles), 'contents', 'to_extract')
+ original_dir = os.path.join(str(datafiles), "contents", "to_extract")
original_contents = list_dir_contents(original_dir)
checkout_contents = list_dir_contents(checkoutdir)
assert checkout_contents == original_contents
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'out-of-basedir-hardlinks'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "out-of-basedir-hardlinks"))
def test_malicious_out_of_basedir_hardlinks(cli, tmpdir, datafiles):
project = str(datafiles)
generate_project(project, tmpdir)
@@ -499,13 +475,15 @@ def test_malicious_out_of_basedir_hardlinks(cli, tmpdir, datafiles):
# Make sure our tarfile is actually created with the desired
# attributes set
with tarfile.open(src_tar, "r:gz") as tar:
- assert any(member.islnk() and
- member.path == "contents/elsewhere/malicious" and
- member.linkname == "../../../malicious_target.bst"
- for member in tar.getmembers())
+ assert any(
+ member.islnk()
+ and member.path == "contents/elsewhere/malicious"
+ and member.linkname == "../../../malicious_target.bst"
+ for member in tar.getmembers()
+ )
# Try to execute the exploit
- result = cli.run(project=project, args=['source', 'track', 'malicious_target.bst'])
+ result = cli.run(project=project, args=["source", "track", "malicious_target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['source', 'fetch', 'malicious_target.bst'])
+ result = cli.run(project=project, args=["source", "fetch", "malicious_target.bst"])
result.assert_main_error(ErrorDomain.STREAM, None)
diff --git a/tests/sources/zip.py b/tests/sources/zip.py
index 3fd43b4bb..dcb1e2637 100644
--- a/tests/sources/zip.py
+++ b/tests/sources/zip.py
@@ -12,17 +12,14 @@ from buildstream.testing import cli # pylint: disable=unused-import
from tests.testutils.file_server import create_file_server
from . import list_dir_contents
-DATA_DIR = os.path.join(
- os.path.dirname(os.path.realpath(__file__)),
- 'zip',
-)
+DATA_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "zip",)
def _assemble_zip(workingdir, dstfile):
old_dir = os.getcwd()
os.chdir(workingdir)
with zipfile.ZipFile(dstfile, "w") as zipfp:
- for root, dirs, files in os.walk('.'):
+ for root, dirs, files in os.walk("."):
names = dirs + files
names = [os.path.join(root, name) for name in names]
for name in names:
@@ -32,49 +29,37 @@ def _assemble_zip(workingdir, dstfile):
def generate_project(project_dir, tmpdir):
project_file = os.path.join(project_dir, "project.conf")
- _yaml.roundtrip_dump({
- 'name': 'foo',
- 'aliases': {
- 'tmpdir': "file:///" + str(tmpdir)
- }
- }, project_file)
+ _yaml.roundtrip_dump({"name": "foo", "aliases": {"tmpdir": "file:///" + str(tmpdir)}}, project_file)
def generate_project_file_server(server, project_dir):
project_file = os.path.join(project_dir, "project.conf")
- _yaml.roundtrip_dump({
- 'name': 'foo',
- 'aliases': {
- 'tmpdir': server.base_url()
- }
- }, project_file)
+ _yaml.roundtrip_dump({"name": "foo", "aliases": {"tmpdir": server.base_url()}}, project_file)
# Test that without ref, consistency is set appropriately.
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'no-ref'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "no-ref"))
def test_no_ref(cli, tmpdir, datafiles):
project = str(datafiles)
generate_project(project, tmpdir)
- assert cli.get_element_state(project, 'target.bst') == 'no reference'
+ assert cli.get_element_state(project, "target.bst") == "no reference"
# Test that when I fetch a nonexistent URL, errors are handled gracefully and a retry is performed.
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'fetch'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "fetch"))
def test_fetch_bad_url(cli, tmpdir, datafiles):
project = str(datafiles)
generate_project(project, tmpdir)
# Try to fetch it
- result = cli.run(project=project, args=[
- 'source', 'fetch', 'target.bst'
- ])
+ result = cli.run(project=project, args=["source", "fetch", "target.bst"])
assert "FAILURE Try #" in result.stderr
result.assert_main_error(ErrorDomain.STREAM, None)
result.assert_task_error(ErrorDomain.SOURCE, None)
# Test that when I fetch with an invalid ref, it fails.
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'fetch'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "fetch"))
def test_fetch_bad_ref(cli, tmpdir, datafiles):
project = str(datafiles)
generate_project(project, tmpdir)
@@ -84,15 +69,13 @@ def test_fetch_bad_ref(cli, tmpdir, datafiles):
_assemble_zip(os.path.join(str(datafiles), "content"), src_zip)
# Try to fetch it
- result = cli.run(project=project, args=[
- 'source', 'fetch', 'target.bst'
- ])
+ result = cli.run(project=project, args=["source", "fetch", "target.bst"])
result.assert_main_error(ErrorDomain.STREAM, None)
result.assert_task_error(ErrorDomain.SOURCE, None)
# Test that when tracking with a ref set, there is a warning
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'fetch'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "fetch"))
def test_track_warning(cli, tmpdir, datafiles):
project = str(datafiles)
generate_project(project, tmpdir)
@@ -102,15 +85,13 @@ def test_track_warning(cli, tmpdir, datafiles):
_assemble_zip(os.path.join(str(datafiles), "content"), src_zip)
# Track it
- result = cli.run(project=project, args=[
- 'source', 'track', 'target.bst'
- ])
+ result = cli.run(project=project, args=["source", "track", "target.bst"])
result.assert_success()
assert "Potential man-in-the-middle attack!" in result.stderr
# Test that a staged checkout matches what was tarred up, with the default first subdir
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'fetch'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "fetch"))
def test_stage_default_basedir(cli, tmpdir, datafiles):
project = str(datafiles)
generate_project(project, tmpdir)
@@ -121,13 +102,13 @@ def test_stage_default_basedir(cli, tmpdir, datafiles):
_assemble_zip(os.path.join(str(datafiles), "content"), src_zip)
# Track, fetch, build, checkout
- result = cli.run(project=project, args=['source', 'track', 'target.bst'])
+ result = cli.run(project=project, args=["source", "track", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['source', 'fetch', 'target.bst'])
+ result = cli.run(project=project, args=["source", "fetch", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['build', 'target.bst'])
+ result = cli.run(project=project, args=["build", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['artifact', 'checkout', 'target.bst', '--directory', checkoutdir])
+ result = cli.run(project=project, args=["artifact", "checkout", "target.bst", "--directory", checkoutdir])
result.assert_success()
# Check that the content of the first directory is checked out (base-dir: '*')
@@ -138,7 +119,7 @@ def test_stage_default_basedir(cli, tmpdir, datafiles):
# Test that a staged checkout matches what was tarred up, with an empty base-dir
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'no-basedir'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "no-basedir"))
def test_stage_no_basedir(cli, tmpdir, datafiles):
project = str(datafiles)
generate_project(project, tmpdir)
@@ -149,13 +130,13 @@ def test_stage_no_basedir(cli, tmpdir, datafiles):
_assemble_zip(os.path.join(str(datafiles), "content"), src_zip)
# Track, fetch, build, checkout
- result = cli.run(project=project, args=['source', 'track', 'target.bst'])
+ result = cli.run(project=project, args=["source", "track", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['source', 'fetch', 'target.bst'])
+ result = cli.run(project=project, args=["source", "fetch", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['build', 'target.bst'])
+ result = cli.run(project=project, args=["build", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['artifact', 'checkout', 'target.bst', '--directory', checkoutdir])
+ result = cli.run(project=project, args=["artifact", "checkout", "target.bst", "--directory", checkoutdir])
result.assert_success()
# Check that the full content of the tarball is checked out (base-dir: '')
@@ -166,7 +147,7 @@ def test_stage_no_basedir(cli, tmpdir, datafiles):
# Test that a staged checkout matches what was tarred up, with an explicit basedir
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'explicit-basedir'))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "explicit-basedir"))
def test_stage_explicit_basedir(cli, tmpdir, datafiles):
project = str(datafiles)
generate_project(project, tmpdir)
@@ -177,13 +158,13 @@ def test_stage_explicit_basedir(cli, tmpdir, datafiles):
_assemble_zip(os.path.join(str(datafiles), "content"), src_zip)
# Track, fetch, build, checkout
- result = cli.run(project=project, args=['source', 'track', 'target.bst'])
+ result = cli.run(project=project, args=["source", "track", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['source', 'fetch', 'target.bst'])
+ result = cli.run(project=project, args=["source", "fetch", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['build', 'target.bst'])
+ result = cli.run(project=project, args=["build", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['artifact', 'checkout', 'target.bst', '--directory', checkoutdir])
+ result = cli.run(project=project, args=["artifact", "checkout", "target.bst", "--directory", checkoutdir])
result.assert_success()
# Check that the content of the first directory is checked out (base-dir: '*')
@@ -193,42 +174,42 @@ def test_stage_explicit_basedir(cli, tmpdir, datafiles):
assert checkout_contents == original_contents
-@pytest.mark.parametrize('server_type', ('FTP', 'HTTP'))
-@pytest.mark.datafiles(os.path.join(DATA_DIR, 'fetch'))
+@pytest.mark.parametrize("server_type", ("FTP", "HTTP"))
+@pytest.mark.datafiles(os.path.join(DATA_DIR, "fetch"))
def test_use_netrc(cli, datafiles, server_type, tmpdir):
- file_server_files = os.path.join(str(tmpdir), 'file_server')
- fake_home = os.path.join(str(tmpdir), 'fake_home')
+ file_server_files = os.path.join(str(tmpdir), "file_server")
+ fake_home = os.path.join(str(tmpdir), "fake_home")
os.makedirs(file_server_files, exist_ok=True)
os.makedirs(fake_home, exist_ok=True)
project = str(datafiles)
- checkoutdir = os.path.join(str(tmpdir), 'checkout')
+ checkoutdir = os.path.join(str(tmpdir), "checkout")
- os.environ['HOME'] = fake_home
- with open(os.path.join(fake_home, '.netrc'), 'wb') as f:
+ os.environ["HOME"] = fake_home
+ with open(os.path.join(fake_home, ".netrc"), "wb") as f:
os.fchmod(f.fileno(), 0o700)
- f.write(b'machine 127.0.0.1\n')
- f.write(b'login testuser\n')
- f.write(b'password 12345\n')
+ f.write(b"machine 127.0.0.1\n")
+ f.write(b"login testuser\n")
+ f.write(b"password 12345\n")
with create_file_server(server_type) as server:
- server.add_user('testuser', '12345', file_server_files)
+ server.add_user("testuser", "12345", file_server_files)
generate_project_file_server(server, project)
- src_zip = os.path.join(file_server_files, 'a.zip')
- _assemble_zip(os.path.join(str(datafiles), 'content'), src_zip)
+ src_zip = os.path.join(file_server_files, "a.zip")
+ _assemble_zip(os.path.join(str(datafiles), "content"), src_zip)
server.start()
- result = cli.run(project=project, args=['source', 'track', 'target.bst'])
+ result = cli.run(project=project, args=["source", "track", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['source', 'fetch', 'target.bst'])
+ result = cli.run(project=project, args=["source", "fetch", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['build', 'target.bst'])
+ result = cli.run(project=project, args=["build", "target.bst"])
result.assert_success()
- result = cli.run(project=project, args=['artifact', 'checkout', 'target.bst', '--directory', checkoutdir])
+ result = cli.run(project=project, args=["artifact", "checkout", "target.bst", "--directory", checkoutdir])
result.assert_success()
- original_dir = os.path.join(str(datafiles), 'content', 'a')
+ original_dir = os.path.join(str(datafiles), "content", "a")
original_contents = list_dir_contents(original_dir)
checkout_contents = list_dir_contents(checkoutdir)
assert checkout_contents == original_contents