# Pylint doesn't play well with fixtures and dependency injection from pytest # pylint: disable=redefined-outer-name import os from shutil import copyfile import subprocess import tarfile import tempfile import urllib.parse import pytest from buildstream import utils from buildstream.exceptions import ErrorDomain from buildstream.testing import generate_project, generate_element, SourceTests from buildstream.testing import cli # pylint: disable=unused-import from buildstream.testing._utils.site import HAVE_LZIP from tests.testutils.file_server import create_file_server from tests.testutils.repo.tar import Tar from . import list_dir_contents DATA_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "tar",) def _assemble_tar(workingdir, srcdir, dstfile): old_dir = os.getcwd() os.chdir(workingdir) with tarfile.open(dstfile, "w:gz") as tar: tar.add(srcdir) os.chdir(old_dir) def _assemble_tar_lz(workingdir, srcdir, dstfile): old_dir = os.getcwd() os.chdir(workingdir) with tempfile.TemporaryFile() as uncompressed: with tarfile.open(fileobj=uncompressed, mode="w:") as tar: tar.add(srcdir) uncompressed.seek(0, 0) with open(dstfile, "wb") as dst: subprocess.call(["lzip"], stdin=uncompressed, stdout=dst) os.chdir(old_dir) class TestTarSource(SourceTests): KIND = "tar" REPO = Tar # Test that without ref, consistency is set appropriately. @pytest.mark.datafiles(os.path.join(DATA_DIR, "no-ref")) def test_no_ref(cli, tmpdir, datafiles): project = str(datafiles) generate_project(project, config={"aliases": {"tmpdir": "file:///" + str(tmpdir)}}) assert cli.get_element_state(project, "target.bst") == "no reference" # Test that when I fetch a nonexistent URL, errors are handled gracefully and a retry is performed. @pytest.mark.datafiles(os.path.join(DATA_DIR, "fetch")) def test_fetch_bad_url(cli, tmpdir, datafiles): project = str(datafiles) generate_project(project, config={"aliases": {"tmpdir": "file:///" + str(tmpdir)}}) # Try to fetch it result = cli.run(project=project, args=["source", "fetch", "target.bst"]) assert "FAILURE Try #" in result.stderr result.assert_main_error(ErrorDomain.STREAM, None) result.assert_task_error(ErrorDomain.SOURCE, None) # Test that when I fetch with an invalid ref, it fails. @pytest.mark.datafiles(os.path.join(DATA_DIR, "fetch")) def test_fetch_bad_ref(cli, tmpdir, datafiles): project = str(datafiles) generate_project(project, config={"aliases": {"tmpdir": "file:///" + str(tmpdir)}}) # Create a local tar src_tar = os.path.join(str(tmpdir), "a.tar.gz") _assemble_tar(os.path.join(str(datafiles), "content"), "a", src_tar) # Try to fetch it result = cli.run(project=project, args=["source", "fetch", "target.bst"]) result.assert_main_error(ErrorDomain.STREAM, None) result.assert_task_error(ErrorDomain.SOURCE, None) # Test that when tracking with a ref set, there is a warning @pytest.mark.datafiles(os.path.join(DATA_DIR, "fetch")) def test_track_warning(cli, tmpdir, datafiles): project = str(datafiles) generate_project(project, config={"aliases": {"tmpdir": "file:///" + str(tmpdir)}}) # Create a local tar src_tar = os.path.join(str(tmpdir), "a.tar.gz") _assemble_tar(os.path.join(str(datafiles), "content"), "a", src_tar) # Track it result = cli.run(project=project, args=["source", "track", "target.bst"]) result.assert_success() assert "Potential man-in-the-middle attack!" in result.stderr # Test that a staged checkout matches what was tarred up, with the default first subdir @pytest.mark.datafiles(os.path.join(DATA_DIR, "fetch")) @pytest.mark.parametrize("srcdir", ["a", "./a"]) def test_stage_default_basedir(cli, tmpdir, datafiles, srcdir): project = str(datafiles) generate_project(project, config={"aliases": {"tmpdir": "file:///" + str(tmpdir)}}) checkoutdir = os.path.join(str(tmpdir), "checkout") # Create a local tar src_tar = os.path.join(str(tmpdir), "a.tar.gz") _assemble_tar(os.path.join(str(datafiles), "content"), srcdir, src_tar) # Track, fetch, build, checkout result = cli.run(project=project, args=["source", "track", "target.bst"]) result.assert_success() result = cli.run(project=project, args=["source", "fetch", "target.bst"]) result.assert_success() result = cli.run(project=project, args=["build", "target.bst"]) result.assert_success() result = cli.run(project=project, args=["artifact", "checkout", "target.bst", "--directory", checkoutdir]) result.assert_success() # Check that the content of the first directory is checked out (base-dir: '*') original_dir = os.path.join(str(datafiles), "content", "a") original_contents = list_dir_contents(original_dir) checkout_contents = list_dir_contents(checkoutdir) assert checkout_contents == original_contents # Test that a staged checkout matches what was tarred up, with an empty base-dir @pytest.mark.datafiles(os.path.join(DATA_DIR, "no-basedir")) @pytest.mark.parametrize("srcdir", ["a", "./a"]) def test_stage_no_basedir(cli, tmpdir, datafiles, srcdir): project = str(datafiles) generate_project(project, config={"aliases": {"tmpdir": "file:///" + str(tmpdir)}}) checkoutdir = os.path.join(str(tmpdir), "checkout") # Create a local tar src_tar = os.path.join(str(tmpdir), "a.tar.gz") _assemble_tar(os.path.join(str(datafiles), "content"), srcdir, src_tar) # Track, fetch, build, checkout result = cli.run(project=project, args=["source", "track", "target.bst"]) result.assert_success() result = cli.run(project=project, args=["source", "fetch", "target.bst"]) result.assert_success() result = cli.run(project=project, args=["build", "target.bst"]) result.assert_success() result = cli.run(project=project, args=["artifact", "checkout", "target.bst", "--directory", checkoutdir]) result.assert_success() # Check that the full content of the tarball is checked out (base-dir: '') original_dir = os.path.join(str(datafiles), "content") original_contents = list_dir_contents(original_dir) checkout_contents = list_dir_contents(checkoutdir) assert checkout_contents == original_contents # Test that a staged checkout matches what was tarred up, with an explicit basedir @pytest.mark.datafiles(os.path.join(DATA_DIR, "explicit-basedir")) @pytest.mark.parametrize("srcdir", ["a", "./a"]) def test_stage_explicit_basedir(cli, tmpdir, datafiles, srcdir): project = str(datafiles) generate_project(project, config={"aliases": {"tmpdir": "file:///" + str(tmpdir)}}) checkoutdir = os.path.join(str(tmpdir), "checkout") # Create a local tar src_tar = os.path.join(str(tmpdir), "a.tar.gz") _assemble_tar(os.path.join(str(datafiles), "content"), srcdir, src_tar) # Track, fetch, build, checkout result = cli.run(project=project, args=["source", "track", "target.bst"]) result.assert_success() result = cli.run(project=project, args=["source", "fetch", "target.bst"]) result.assert_success() result = cli.run(project=project, args=["build", "target.bst"]) result.assert_success() result = cli.run(project=project, args=["artifact", "checkout", "target.bst", "--directory", checkoutdir]) result.assert_success() # Check that the content of the first directory is checked out (base-dir: '*') original_dir = os.path.join(str(datafiles), "content", "a") original_contents = list_dir_contents(original_dir) checkout_contents = list_dir_contents(checkoutdir) assert checkout_contents == original_contents # Test that we succeed to extract tarballs with hardlinks when stripping the # leading paths @pytest.mark.datafiles(os.path.join(DATA_DIR, "contains-links")) def test_stage_contains_links(cli, tmpdir, datafiles): project = str(datafiles) generate_project(project, config={"aliases": {"tmpdir": "file:///" + str(tmpdir)}}) checkoutdir = os.path.join(str(tmpdir), "checkout") # Create a local tar src_tar = os.path.join(str(tmpdir), "a.tar.gz") # Create a hardlink, we wont trust git to store that info for us os.makedirs(os.path.join(str(datafiles), "content", "base-directory", "subdir2"), exist_ok=True) file1 = os.path.join(str(datafiles), "content", "base-directory", "subdir1", "file.txt") file2 = os.path.join(str(datafiles), "content", "base-directory", "subdir2", "file.txt") os.link(file1, file2) _assemble_tar(os.path.join(str(datafiles), "content"), "base-directory", src_tar) # Track, fetch, build, checkout result = cli.run(project=project, args=["source", "track", "target.bst"]) result.assert_success() result = cli.run(project=project, args=["source", "fetch", "target.bst"]) result.assert_success() result = cli.run(project=project, args=["build", "target.bst"]) result.assert_success() result = cli.run(project=project, args=["artifact", "checkout", "target.bst", "--directory", checkoutdir]) result.assert_success() # Check that the content of the first directory is checked out (base-dir: '*') original_dir = os.path.join(str(datafiles), "content", "base-directory") original_contents = list_dir_contents(original_dir) checkout_contents = list_dir_contents(checkoutdir) assert checkout_contents == original_contents @pytest.mark.skipif(not HAVE_LZIP, reason="lzip is not available") @pytest.mark.datafiles(os.path.join(DATA_DIR, "fetch")) @pytest.mark.parametrize("srcdir", ["a", "./a"]) def test_stage_default_basedir_lzip(cli, tmpdir, datafiles, srcdir): project = str(datafiles) generate_project(project, config={"aliases": {"tmpdir": "file:///" + str(tmpdir)}}) checkoutdir = os.path.join(str(tmpdir), "checkout") # Create a local tar src_tar = os.path.join(str(tmpdir), "a.tar.lz") _assemble_tar_lz(os.path.join(str(datafiles), "content"), srcdir, src_tar) # Track, fetch, build, checkout result = cli.run(project=project, args=["source", "track", "target-lz.bst"]) result.assert_success() result = cli.run(project=project, args=["source", "fetch", "target-lz.bst"]) result.assert_success() result = cli.run(project=project, args=["build", "target-lz.bst"]) result.assert_success() result = cli.run(project=project, args=["artifact", "checkout", "target-lz.bst", "--directory", checkoutdir]) result.assert_success() # Check that the content of the first directory is checked out (base-dir: '*') original_dir = os.path.join(str(datafiles), "content", "a") original_contents = list_dir_contents(original_dir) checkout_contents = list_dir_contents(checkoutdir) assert checkout_contents == original_contents # Test that tarballs with read-only files work # a - contains read-only files in a writable directory # b - root directory has read-only permission # c - contains one file that has no read nor write permissions. Base-dir set to '' to extract root of tarball @pytest.mark.datafiles(os.path.join(DATA_DIR, "read-only")) @pytest.mark.parametrize("tar_name, base_dir", [("a", "*"), ("b", "*"), ("c", "")]) def test_read_only_dir(cli, tmpdir, datafiles, tar_name, base_dir): try: project = str(datafiles) generate_project(project, config={"aliases": {"tmpdir": "file:///" + str(tmpdir)}}) tar_file = "{}.tar.gz".format(tar_name) generate_element( project, "target.bst", { "kind": "import", "sources": [{"kind": "tar", "url": "tmpdir:/{}".format(tar_file), "ref": "foo", "base-dir": base_dir}], }, ) # Get the tarball in tests/sources/tar/read-only/content # # NOTE that we need to do this because tarfile.open and tar.add() # are packing the tar up with writeable files and dirs tarball = os.path.join(str(datafiles), "content", tar_file) if not os.path.exists(tarball): raise FileNotFoundError("{} does not exist".format(tarball)) copyfile(tarball, os.path.join(str(tmpdir), tar_file)) # Because this test can potentially leave directories behind # which are difficult to remove, ask buildstream to use # our temp directory, so we can clean up. tmpdir_str = str(tmpdir) if not tmpdir_str.endswith(os.path.sep): tmpdir_str += os.path.sep env = {"TMP": tmpdir_str} # Track, fetch, build, checkout result = cli.run(project=project, args=["source", "track", "target.bst"], env=env) result.assert_success() result = cli.run(project=project, args=["source", "fetch", "target.bst"], env=env) result.assert_success() result = cli.run(project=project, args=["build", "target.bst"], env=env) result.assert_success() finally: utils._force_rmtree(str(tmpdir)) @pytest.mark.parametrize("server_type", ("FTP", "HTTP")) @pytest.mark.datafiles(os.path.join(DATA_DIR, "fetch")) def test_use_netrc(cli, datafiles, server_type, tmpdir): file_server_files = os.path.join(str(tmpdir), "file_server") fake_home = os.path.join(str(tmpdir), "fake_home") os.makedirs(file_server_files, exist_ok=True) os.makedirs(fake_home, exist_ok=True) project = str(datafiles) checkoutdir = os.path.join(str(tmpdir), "checkout") os.environ["HOME"] = fake_home with open(os.path.join(fake_home, ".netrc"), "wb") as f: os.fchmod(f.fileno(), 0o700) f.write(b"machine 127.0.0.1\n") f.write(b"login testuser\n") f.write(b"password 12345\n") with create_file_server(server_type) as server: server.add_user("testuser", "12345", file_server_files) generate_project(project, config={"aliases": {"tmpdir": server.base_url()}}) src_tar = os.path.join(file_server_files, "a.tar.gz") _assemble_tar(os.path.join(str(datafiles), "content"), "a", src_tar) server.start() result = cli.run(project=project, args=["source", "track", "target.bst"]) result.assert_success() result = cli.run(project=project, args=["source", "fetch", "target.bst"]) result.assert_success() result = cli.run(project=project, args=["build", "target.bst"]) result.assert_success() result = cli.run(project=project, args=["artifact", "checkout", "target.bst", "--directory", checkoutdir]) result.assert_success() original_dir = os.path.join(str(datafiles), "content", "a") original_contents = list_dir_contents(original_dir) checkout_contents = list_dir_contents(checkoutdir) assert checkout_contents == original_contents @pytest.mark.parametrize("server_type", ("FTP", "HTTP")) @pytest.mark.datafiles(os.path.join(DATA_DIR, "fetch")) def test_netrc_already_specified_user(cli, datafiles, server_type, tmpdir): file_server_files = os.path.join(str(tmpdir), "file_server") fake_home = os.path.join(str(tmpdir), "fake_home") os.makedirs(file_server_files, exist_ok=True) os.makedirs(fake_home, exist_ok=True) project = str(datafiles) os.environ["HOME"] = fake_home with open(os.path.join(fake_home, ".netrc"), "wb") as f: os.fchmod(f.fileno(), 0o700) f.write(b"machine 127.0.0.1\n") f.write(b"login testuser\n") f.write(b"password 12345\n") with create_file_server(server_type) as server: server.add_user("otheruser", "12345", file_server_files) parts = urllib.parse.urlsplit(server.base_url()) base_url = urllib.parse.urlunsplit([parts[0], "otheruser@{}".format(parts[1]), *parts[2:]]) generate_project(project, config={"aliases": {"tmpdir": base_url}}) src_tar = os.path.join(file_server_files, "a.tar.gz") _assemble_tar(os.path.join(str(datafiles), "content"), "a", src_tar) server.start() result = cli.run(project=project, args=["source", "track", "target.bst"]) result.assert_main_error(ErrorDomain.STREAM, None) result.assert_task_error(ErrorDomain.SOURCE, None) # Test that BuildStream doesnt crash if HOME is unset while # the netrc module is trying to find it's ~/.netrc file. @pytest.mark.datafiles(os.path.join(DATA_DIR, "fetch")) def test_homeless_environment(cli, tmpdir, datafiles): project = str(datafiles) generate_project(project, config={"aliases": {"tmpdir": "file:///" + str(tmpdir)}}) # Create a local tar src_tar = os.path.join(str(tmpdir), "a.tar.gz") _assemble_tar(os.path.join(str(datafiles), "content"), "a", src_tar) # Use a track, make sure the plugin tries to find a ~/.netrc result = cli.run(project=project, args=["source", "track", "target.bst"], env={"HOME": None}) result.assert_success() @pytest.mark.datafiles(os.path.join(DATA_DIR, "out-of-basedir-hardlinks")) def test_out_of_basedir_hardlinks(cli, tmpdir, datafiles): def ensure_link(member): # By default, python will simply duplicate files - we want # hardlinks! if member.path == "contents/to_extract/a": member.type = tarfile.LNKTYPE member.linkname = "contents/elsewhere/a" return member project = str(datafiles) generate_project(project, config={"aliases": {"tmpdir": "file:///" + str(tmpdir)}}) checkoutdir = os.path.join(str(tmpdir), "checkout") # Create a tarball with an odd hardlink src_tar = os.path.join(str(tmpdir), "contents.tar.gz") old_dir = os.getcwd() os.chdir(str(tmpdir)) with tarfile.open(src_tar, "w:gz") as tar: # Don't recursively add `contents` as the order is not guaranteed. # We need to add `elsewhere` before `to_extract` as the latter # references the former in `linkname`. tar.add("contents", recursive=False) tar.add("contents/elsewhere") tar.add("contents/to_extract", filter=ensure_link) os.chdir(old_dir) # Make sure our tarfile is actually created with the desired # attributes set with tarfile.open(src_tar, "r:gz") as tar: assert any( member.islnk() and member.path == "contents/to_extract/a" and member.linkname == "contents/elsewhere/a" for member in tar.getmembers() ) # Assert that we will actually create a singular copy of the file result = cli.run(project=project, args=["source", "track", "target.bst"]) result.assert_success() result = cli.run(project=project, args=["source", "fetch", "target.bst"]) result.assert_success() result = cli.run(project=project, args=["build", "target.bst"]) result.assert_success() result = cli.run(project=project, args=["artifact", "checkout", "target.bst", "--directory", checkoutdir]) result.assert_success() original_dir = os.path.join(str(datafiles), "contents", "to_extract") original_contents = list_dir_contents(original_dir) checkout_contents = list_dir_contents(checkoutdir) assert checkout_contents == original_contents @pytest.mark.datafiles(os.path.join(DATA_DIR, "out-of-basedir-hardlinks")) def test_malicious_out_of_basedir_hardlinks(cli, tmpdir, datafiles): project = str(datafiles) generate_project(project, config={"aliases": {"tmpdir": "file:///" + str(tmpdir)}}) # Create a maliciously-hardlinked tarball def ensure_link(member): # By default, python will simply duplicate files - we want # hardlinks! if member.path == "contents/elsewhere/malicious": member.type = tarfile.LNKTYPE # This should not be allowed member.linkname = "../../../malicious_target.bst" return member src_tar = os.path.join(str(tmpdir), "contents.tar.gz") old_dir = os.getcwd() os.chdir(str(tmpdir)) with tarfile.open(src_tar, "w:gz") as tar: tar.add("contents", filter=ensure_link) os.chdir(old_dir) # Make sure our tarfile is actually created with the desired # attributes set with tarfile.open(src_tar, "r:gz") as tar: assert any( member.islnk() and member.path == "contents/elsewhere/malicious" and member.linkname == "../../../malicious_target.bst" for member in tar.getmembers() ) # Try to execute the exploit result = cli.run(project=project, args=["source", "track", "malicious_target.bst"]) result.assert_success() result = cli.run(project=project, args=["source", "fetch", "malicious_target.bst"]) result.assert_main_error(ErrorDomain.STREAM, None)