#-*-coding:utf-8-*- # test_repo.py # Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors # # This module is part of GitPython and is released under # the BSD License: http://www.opensource.org/licenses/bsd-license.php from git.test.lib import ( patch, TestBase, with_rw_repo, fixture, assert_false, assert_equal, assert_true, raises ) from git import ( InvalidGitRepositoryError, Repo, NoSuchPathError, Head, Commit, Tree, IndexFile, Git, Reference, GitDB, Submodule, GitCmdObjectDB, Remote, BadName, GitCommandError ) from git.repo.fun import touch from git.util import join_path_native from git.exc import BadObject from gitdb.util import bin_to_hex from git.compat import string_types from gitdb.test.lib import with_rw_directory import os import sys import tempfile import shutil import itertools from io import BytesIO class TestRepo(TestBase): @raises(InvalidGitRepositoryError) def test_new_should_raise_on_invalid_repo_location(self): Repo(tempfile.gettempdir()) @raises(NoSuchPathError) def test_new_should_raise_on_non_existant_path(self): Repo("repos/foobar") @with_rw_repo('0.3.2.1') def test_repo_creation_from_different_paths(self, rw_repo): r_from_gitdir = Repo(rw_repo.git_dir) assert r_from_gitdir.git_dir == rw_repo.git_dir assert r_from_gitdir.git_dir.endswith('.git') assert not rw_repo.git.working_dir.endswith('.git') assert r_from_gitdir.git.working_dir == rw_repo.git.working_dir def test_description(self): txt = "Test repository" self.rorepo.description = txt assert_equal(self.rorepo.description, txt) def test_heads_should_return_array_of_head_objects(self): for head in self.rorepo.heads: assert_equal(Head, head.__class__) def test_heads_should_populate_head_data(self): for head in self.rorepo.heads: assert head.name assert isinstance(head.commit, Commit) # END for each head assert isinstance(self.rorepo.heads.master, Head) assert isinstance(self.rorepo.heads['master'], Head) def test_tree_from_revision(self): tree = self.rorepo.tree('0.1.6') assert len(tree.hexsha) == 40 assert tree.type == "tree" assert self.rorepo.tree(tree) == tree # try from invalid revision that does not exist self.failUnlessRaises(BadName, self.rorepo.tree, 'hello world') def test_commit_from_revision(self): commit = self.rorepo.commit('0.1.4') assert commit.type == 'commit' assert self.rorepo.commit(commit) == commit def test_commits(self): mc = 10 commits = list(self.rorepo.iter_commits('0.1.6', max_count=mc)) assert len(commits) == mc c = commits[0] assert_equal('9a4b1d4d11eee3c5362a4152216376e634bd14cf', c.hexsha) assert_equal(["c76852d0bff115720af3f27acdb084c59361e5f6"], [p.hexsha for p in c.parents]) assert_equal("ce41fc29549042f1aa09cc03174896cf23f112e3", c.tree.hexsha) assert_equal("Michael Trier", c.author.name) assert_equal("mtrier@gmail.com", c.author.email) assert_equal(1232829715, c.authored_date) assert_equal(5 * 3600, c.author_tz_offset) assert_equal("Michael Trier", c.committer.name) assert_equal("mtrier@gmail.com", c.committer.email) assert_equal(1232829715, c.committed_date) assert_equal(5 * 3600, c.committer_tz_offset) assert_equal("Bumped version 0.1.6\n", c.message) c = commits[1] assert isinstance(c.parents, tuple) def test_trees(self): mc = 30 num_trees = 0 for tree in self.rorepo.iter_trees('0.1.5', max_count=mc): num_trees += 1 assert isinstance(tree, Tree) # END for each tree assert num_trees == mc def _assert_empty_repo(self, repo): # test all kinds of things with an empty, freshly initialized repo. # It should throw good errors # entries should be empty assert len(repo.index.entries) == 0 # head is accessible assert repo.head assert repo.head.ref assert not repo.head.is_valid() # we can change the head to some other ref head_ref = Head.from_path(repo, Head.to_full_path('some_head')) assert not head_ref.is_valid() repo.head.ref = head_ref # is_dirty can handle all kwargs for args in ((1, 0, 0), (0, 1, 0), (0, 0, 1)): assert not repo.is_dirty(*args) # END for each arg # we can add a file to the index ( if we are not bare ) if not repo.bare: pass # END test repos with working tree def test_init(self): prev_cwd = os.getcwd() os.chdir(tempfile.gettempdir()) git_dir_rela = "repos/foo/bar.git" del_dir_abs = os.path.abspath("repos") git_dir_abs = os.path.abspath(git_dir_rela) try: # with specific path for path in (git_dir_rela, git_dir_abs): r = Repo.init(path=path, bare=True) assert isinstance(r, Repo) assert r.bare is True assert not r.has_separate_working_tree() assert os.path.isdir(r.git_dir) self._assert_empty_repo(r) # test clone clone_path = path + "_clone" rc = r.clone(clone_path) self._assert_empty_repo(rc) try: shutil.rmtree(clone_path) except OSError: # when relative paths are used, the clone may actually be inside # of the parent directory pass # END exception handling # try again, this time with the absolute version rc = Repo.clone_from(r.git_dir, clone_path) self._assert_empty_repo(rc) shutil.rmtree(git_dir_abs) try: shutil.rmtree(clone_path) except OSError: # when relative paths are used, the clone may actually be inside # of the parent directory pass # END exception handling # END for each path os.makedirs(git_dir_rela) os.chdir(git_dir_rela) r = Repo.init(bare=False) assert r.bare is False assert not r.has_separate_working_tree() self._assert_empty_repo(r) finally: try: shutil.rmtree(del_dir_abs) except OSError: pass os.chdir(prev_cwd) # END restore previous state def test_bare_property(self): self.rorepo.bare def test_daemon_export(self): orig_val = self.rorepo.daemon_export self.rorepo.daemon_export = not orig_val assert self.rorepo.daemon_export == (not orig_val) self.rorepo.daemon_export = orig_val assert self.rorepo.daemon_export == orig_val def test_alternates(self): cur_alternates = self.rorepo.alternates # empty alternates self.rorepo.alternates = [] assert self.rorepo.alternates == [] alts = ["other/location", "this/location"] self.rorepo.alternates = alts assert alts == self.rorepo.alternates self.rorepo.alternates = cur_alternates def test_repr(self): assert repr(self.rorepo).startswith(' 1) # END for each item to traverse assert c, "Should have executed at least one blame command" assert nml, "There should at least be one blame commit that contains multiple lines" @patch.object(Git, '_call_process') def test_blame_complex_revision(self, git): git.return_value = fixture('blame_complex_revision') res = self.rorepo.blame("HEAD~10..HEAD", "README.md") assert len(res) == 1 assert len(res[0][1]) == 83, "Unexpected amount of parsed blame lines" @with_rw_repo('HEAD', bare=False) def test_untracked_files(self, rwrepo): for (run, repo_add) in enumerate((rwrepo.index.add, rwrepo.git.add)): base = rwrepo.working_tree_dir files = (join_path_native(base, u"%i_test _myfile" % run), join_path_native(base, "%i_test_other_file" % run), join_path_native(base, u"%i__çava verböten" % run), join_path_native(base, u"%i_çava-----verböten" % run)) num_recently_untracked = 0 for fpath in files: fd = open(fpath, "wb") fd.close() # END for each filename untracked_files = rwrepo.untracked_files num_recently_untracked = len(untracked_files) # assure we have all names - they are relative to the git-dir num_test_untracked = 0 for utfile in untracked_files: num_test_untracked += join_path_native(base, utfile) in files assert len(files) == num_test_untracked repo_add(untracked_files) assert len(rwrepo.untracked_files) == (num_recently_untracked - len(files)) # end for each run def test_config_reader(self): reader = self.rorepo.config_reader() # all config files assert reader.read_only reader = self.rorepo.config_reader("repository") # single config file assert reader.read_only def test_config_writer(self): for config_level in self.rorepo.config_level: try: writer = self.rorepo.config_writer(config_level) assert not writer.read_only writer.release() except IOError: # its okay not to get a writer for some configuration files if we # have no permissions pass # END for each config level def test_config_level_paths(self): for config_level in self.rorepo.config_level: assert self.rorepo._get_config_path(config_level) # end for each config level def test_creation_deletion(self): # just a very quick test to assure it generally works. There are # specialized cases in the test_refs module head = self.rorepo.create_head("new_head", "HEAD~1") self.rorepo.delete_head(head) tag = self.rorepo.create_tag("new_tag", "HEAD~2") self.rorepo.delete_tag(tag) writer = self.rorepo.config_writer() writer.release() remote = self.rorepo.create_remote("new_remote", "git@server:repo.git") self.rorepo.delete_remote(remote) def test_comparison_and_hash(self): # this is only a preliminary test, more testing done in test_index assert self.rorepo == self.rorepo and not (self.rorepo != self.rorepo) assert len(set((self.rorepo, self.rorepo))) == 1 @with_rw_directory def test_tilde_and_env_vars_in_repo_path(self, rw_dir): ph = os.environ['HOME'] try: os.environ['HOME'] = rw_dir Repo.init(os.path.join('~', 'test.git'), bare=True) os.environ['FOO'] = rw_dir Repo.init(os.path.join('$FOO', 'test.git'), bare=True) finally: os.environ['HOME'] = ph del os.environ['FOO'] # end assure HOME gets reset to what it was def test_git_cmd(self): # test CatFileContentStream, just to be very sure we have no fencepost errors # last \n is the terminating newline that it expects l1 = b"0123456789\n" l2 = b"abcdefghijklmnopqrstxy\n" l3 = b"z\n" d = l1 + l2 + l3 + b"\n" l1p = l1[:5] # full size # size is without terminating newline def mkfull(): return Git.CatFileContentStream(len(d) - 1, BytesIO(d)) ts = 5 def mktiny(): return Git.CatFileContentStream(ts, BytesIO(d)) # readlines no limit s = mkfull() lines = s.readlines() assert len(lines) == 3 and lines[-1].endswith(b'\n') assert s._stream.tell() == len(d) # must have scrubbed to the end # realines line limit s = mkfull() lines = s.readlines(5) assert len(lines) == 1 # readlines on tiny sections s = mktiny() lines = s.readlines() assert len(lines) == 1 and lines[0] == l1p assert s._stream.tell() == ts + 1 # readline no limit s = mkfull() assert s.readline() == l1 assert s.readline() == l2 assert s.readline() == l3 assert s.readline() == '' assert s._stream.tell() == len(d) # readline limit s = mkfull() assert s.readline(5) == l1p assert s.readline() == l1[5:] # readline on tiny section s = mktiny() assert s.readline() == l1p assert s.readline() == '' assert s._stream.tell() == ts + 1 # read no limit s = mkfull() assert s.read() == d[:-1] assert s.read() == '' assert s._stream.tell() == len(d) # read limit s = mkfull() assert s.read(5) == l1p assert s.read(6) == l1[5:] assert s._stream.tell() == 5 + 6 # its not yet done # read tiny s = mktiny() assert s.read(2) == l1[:2] assert s._stream.tell() == 2 assert s.read() == l1[2:ts] assert s._stream.tell() == ts + 1 def _assert_rev_parse_types(self, name, rev_obj): rev_parse = self.rorepo.rev_parse if rev_obj.type == 'tag': rev_obj = rev_obj.object # tree and blob type obj = rev_parse(name + '^{tree}') assert obj == rev_obj.tree obj = rev_parse(name + ':CHANGES') assert obj.type == 'blob' and obj.path == 'CHANGES' assert rev_obj.tree['CHANGES'] == obj def _assert_rev_parse(self, name): """tries multiple different rev-parse syntaxes with the given name :return: parsed object""" rev_parse = self.rorepo.rev_parse orig_obj = rev_parse(name) if orig_obj.type == 'tag': obj = orig_obj.object else: obj = orig_obj # END deref tags by default # try history rev = name + "~" obj2 = rev_parse(rev) assert obj2 == obj.parents[0] self._assert_rev_parse_types(rev, obj2) # history with number ni = 11 history = [obj.parents[0]] for pn in range(ni): history.append(history[-1].parents[0]) # END get given amount of commits for pn in range(11): rev = name + "~%i" % (pn + 1) obj2 = rev_parse(rev) assert obj2 == history[pn] self._assert_rev_parse_types(rev, obj2) # END history check # parent ( default ) rev = name + "^" obj2 = rev_parse(rev) assert obj2 == obj.parents[0] self._assert_rev_parse_types(rev, obj2) # parent with number for pn, parent in enumerate(obj.parents): rev = name + "^%i" % (pn + 1) assert rev_parse(rev) == parent self._assert_rev_parse_types(rev, parent) # END for each parent return orig_obj @with_rw_repo('HEAD', bare=False) def test_rw_rev_parse(self, rwrepo): # verify it does not confuse branches with hexsha ids ahead = rwrepo.create_head('aaaaaaaa') assert(rwrepo.rev_parse(str(ahead)) == ahead.commit) def test_rev_parse(self): rev_parse = self.rorepo.rev_parse # try special case: This one failed at some point, make sure its fixed assert rev_parse("33ebe").hexsha == "33ebe7acec14b25c5f84f35a664803fcab2f7781" # start from reference num_resolved = 0 for ref_no, ref in enumerate(Reference.iter_items(self.rorepo)): path_tokens = ref.path.split("/") for pt in range(len(path_tokens)): path_section = '/'.join(path_tokens[-(pt + 1):]) try: obj = self._assert_rev_parse(path_section) assert obj.type == ref.object.type num_resolved += 1 except (BadName, BadObject): print("failed on %s" % path_section) # is fine, in case we have something like 112, which belongs to remotes/rname/merge-requests/112 pass # END exception handling # END for each token if ref_no == 3 - 1: break # END for each reference assert num_resolved # it works with tags ! tag = self._assert_rev_parse('0.1.4') assert tag.type == 'tag' # try full sha directly ( including type conversion ) assert tag.object == rev_parse(tag.object.hexsha) self._assert_rev_parse_types(tag.object.hexsha, tag.object) # multiple tree types result in the same tree: HEAD^{tree}^{tree}:CHANGES rev = '0.1.4^{tree}^{tree}' assert rev_parse(rev) == tag.object.tree assert rev_parse(rev + ':CHANGES') == tag.object.tree['CHANGES'] # try to get parents from first revision - it should fail as no such revision # exists first_rev = "33ebe7acec14b25c5f84f35a664803fcab2f7781" commit = rev_parse(first_rev) assert len(commit.parents) == 0 assert commit.hexsha == first_rev self.failUnlessRaises(BadName, rev_parse, first_rev + "~") self.failUnlessRaises(BadName, rev_parse, first_rev + "^") # short SHA1 commit2 = rev_parse(first_rev[:20]) assert commit2 == commit commit2 = rev_parse(first_rev[:5]) assert commit2 == commit # todo: dereference tag into a blob 0.1.7^{blob} - quite a special one # needs a tag which points to a blob # ref^0 returns commit being pointed to, same with ref~0, and ^{} tag = rev_parse('0.1.4') for token in (('~0', '^0', '^{}')): assert tag.object == rev_parse('0.1.4%s' % token) # END handle multiple tokens # try partial parsing max_items = 40 for i, binsha in enumerate(self.rorepo.odb.sha_iter()): assert rev_parse(bin_to_hex(binsha)[:8 - (i % 2)].decode('ascii')).binsha == binsha if i > max_items: # this is rather slow currently, as rev_parse returns an object # which requires accessing packs, it has some additional overhead break # END for each binsha in repo # missing closing brace commit^{tree self.failUnlessRaises(ValueError, rev_parse, '0.1.4^{tree') # missing starting brace self.failUnlessRaises(ValueError, rev_parse, '0.1.4^tree}') # REVLOG ####### head = self.rorepo.head # need to specify a ref when using the @ syntax self.failUnlessRaises(BadObject, rev_parse, "%s@{0}" % head.commit.hexsha) # uses HEAD.ref by default assert rev_parse('@{0}') == head.commit if not head.is_detached: refspec = '%s@{0}' % head.ref.name assert rev_parse(refspec) == head.ref.commit # all additional specs work as well assert rev_parse(refspec + "^{tree}") == head.commit.tree assert rev_parse(refspec + ":CHANGES").type == 'blob' # END operate on non-detached head # position doesn't exist self.failUnlessRaises(IndexError, rev_parse, '@{10000}') # currently, nothing more is supported self.failUnlessRaises(NotImplementedError, rev_parse, "@{1 week ago}") # the last position assert rev_parse('@{1}') != head.commit def test_repo_odbtype(self): target_type = GitDB if sys.version_info[:2] < (2, 5): target_type = GitCmdObjectDB assert isinstance(self.rorepo.odb, target_type) def test_submodules(self): assert len(self.rorepo.submodules) == 1 # non-recursive assert len(list(self.rorepo.iter_submodules())) >= 2 assert isinstance(self.rorepo.submodule("gitdb"), Submodule) self.failUnlessRaises(ValueError, self.rorepo.submodule, "doesn't exist") @with_rw_repo('HEAD', bare=False) def test_submodule_update(self, rwrepo): # fails in bare mode rwrepo._bare = True self.failUnlessRaises(InvalidGitRepositoryError, rwrepo.submodule_update) rwrepo._bare = False # test create submodule sm = rwrepo.submodules[0] sm = rwrepo.create_submodule("my_new_sub", "some_path", join_path_native(self.rorepo.working_tree_dir, sm.path)) assert isinstance(sm, Submodule) # note: the rest of this functionality is tested in test_submodule @with_rw_repo('HEAD') def test_git_file(self, rwrepo): # Move the .git directory to another location and create the .git file. real_path_abs = os.path.abspath(join_path_native(rwrepo.working_tree_dir, '.real')) os.rename(rwrepo.git_dir, real_path_abs) git_file_path = join_path_native(rwrepo.working_tree_dir, '.git') open(git_file_path, 'wb').write(fixture('git_file')) # Create a repo and make sure it's pointing to the relocated .git directory. git_file_repo = Repo(rwrepo.working_tree_dir) assert os.path.abspath(git_file_repo.git_dir) == real_path_abs # Test using an absolute gitdir path in the .git file. open(git_file_path, 'wb').write(('gitdir: %s\n' % real_path_abs).encode('ascii')) git_file_repo = Repo(rwrepo.working_tree_dir) assert os.path.abspath(git_file_repo.git_dir) == real_path_abs def test_file_handle_leaks(self): def last_commit(repo, rev, path): commit = next(repo.iter_commits(rev, path, max_count=1)) commit.tree[path] # This is based on this comment # https://github.com/gitpython-developers/GitPython/issues/60#issuecomment-23558741 # And we expect to set max handles to a low value, like 64 # You should set ulimit -n X, see .travis.yml # The loops below would easily create 500 handles if these would leak (4 pipes + multiple mapped files) for i in range(64): for repo_type in (GitCmdObjectDB, GitDB): repo = Repo(self.rorepo.working_tree_dir, odbt=repo_type) last_commit(repo, 'master', 'git/test/test_base.py') # end for each repository type # end for each iteration def test_remote_method(self): self.failUnlessRaises(ValueError, self.rorepo.remote, 'foo-blue') assert isinstance(self.rorepo.remote(name='origin'), Remote) @with_rw_directory def test_empty_repo(self, rw_dir): """Assure we can handle empty repositories""" r = Repo.init(rw_dir, mkdir=False) # It's ok not to be able to iterate a commit, as there is none self.failUnlessRaises(ValueError, r.iter_commits) assert r.active_branch.name == 'master' assert not r.active_branch.is_valid(), "Branch is yet to be born" # actually, when trying to create a new branch without a commit, git itself fails # We should, however, not fail ungracefully self.failUnlessRaises(BadName, r.create_head, 'foo') self.failUnlessRaises(BadName, r.create_head, 'master') # It's expected to not be able to access a tree self.failUnlessRaises(ValueError, r.tree) new_file_path = os.path.join(rw_dir, "new_file.ext") touch(new_file_path) r.index.add([new_file_path]) r.index.commit("initial commit") # Now a branch should be creatable nb = r.create_head('foo') assert nb.is_valid() def test_merge_base(self): repo = self.rorepo c1 = 'f6aa8d1' c2 = repo.commit('d46e3fe') c3 = '763ef75' self.failUnlessRaises(ValueError, repo.merge_base) self.failUnlessRaises(ValueError, repo.merge_base, 'foo') # two commit merge-base res = repo.merge_base(c1, c2) assert isinstance(res, list) and len(res) == 1 and isinstance(res[0], Commit) assert res[0].hexsha.startswith('3936084') for kw in ('a', 'all'): res = repo.merge_base(c1, c2, c3, **{kw: True}) assert isinstance(res, list) and len(res) == 1 # end for eaech keyword signalling all merge-bases to be returned # Test for no merge base - can't do as we have self.failUnlessRaises(GitCommandError, repo.merge_base, c1, 'ffffff') def test_is_ancestor(self): repo = self.rorepo c1 = 'f6aa8d1' c2 = '763ef75' self.assertTrue(repo.is_ancestor(c1, c1)) self.assertTrue(repo.is_ancestor("master", "master")) self.assertTrue(repo.is_ancestor(c1, c2)) self.assertTrue(repo.is_ancestor(c1, "master")) self.assertFalse(repo.is_ancestor(c2, c1)) self.assertFalse(repo.is_ancestor("master", c1)) for i, j in itertools.permutations([c1, 'ffffff', ''], r=2): self.assertRaises(GitCommandError, repo.is_ancestor, i, j)