summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKostis Anagnostopoulos <ankostis@gmail.com>2016-10-01 16:02:20 +0200
committerKostis Anagnostopoulos <ankostis@gmail.com>2016-10-01 16:33:20 +0200
commitb8b025f719b2c3203e194580bbd0785a26c08ebd (patch)
tree8cec9ba13035f9489fdaa56e550e93bc976372fc
parenta79cf677744e2c1721fa55f934fa07034bc54b0a (diff)
downloadgitpython-b8b025f719b2c3203e194580bbd0785a26c08ebd.tar.gz
Win, #519: FIX repo TCs.
+ FIX TestRepo.test_submodule_update(): + submod: del `.git` file prior overwrite; Windows denied otherwise! + FIX TestRepo.test_untracked_files(): + In the `git add <file>` case, it failed with unicode args on PY2. Had to encode them with `locale.getpreferredencoding()` AND use SHELL. + cmd: add `shell` into `execute()` kwds, for overriding USE_SHELL per command. + repo: replace blocky `communicate()` in `_clone()` with thread-pumps. + test_repo.py: unittestize (almost all) assertions. + Replace open --> with open for index (base and TC). + test_index.py: Enabled a dormant assertion.
-rw-r--r--git/cmd.py12
-rw-r--r--git/compat.py13
-rw-r--r--git/index/base.py15
-rw-r--r--git/objects/submodule/base.py21
-rw-r--r--git/repo/base.py8
-rw-r--r--git/test/test_index.py35
-rw-r--r--git/test/test_repo.py213
-rw-r--r--git/test/test_submodule.py3
8 files changed, 177 insertions, 143 deletions
diff --git a/git/cmd.py b/git/cmd.py
index b47b2a02..f4f5f99a 100644
--- a/git/cmd.py
+++ b/git/cmd.py
@@ -45,7 +45,7 @@ from .util import (
execute_kwargs = set(('istream', 'with_keep_cwd', 'with_extended_output',
'with_exceptions', 'as_process', 'stdout_as_string',
'output_stream', 'with_stdout', 'kill_after_timeout',
- 'universal_newlines'))
+ 'universal_newlines', 'shell'))
log = logging.getLogger('git.cmd')
log.addHandler(logging.NullHandler())
@@ -176,8 +176,8 @@ class Git(LazyMixin):
GIT_PYTHON_GIT_EXECUTABLE = os.environ.get(_git_exec_env_var, git_exec_name)
# If True, a shell will be used when executing git commands.
- # This should only be desirable on windows, see https://github.com/gitpython-developers/GitPython/pull/126
- # for more information
+ # This should only be desirable on Windows, see https://github.com/gitpython-developers/GitPython/pull/126
+ # and check `git/test_repo.py:TestRepo.test_untracked_files()` TC for an example where it is required.
# Override this value using `Git.USE_SHELL = True`
USE_SHELL = False
@@ -422,6 +422,7 @@ class Git(LazyMixin):
kill_after_timeout=None,
with_stdout=True,
universal_newlines=False,
+ shell=None,
**subprocess_kwargs
):
"""Handles executing the command on the shell and consumes and returns
@@ -479,6 +480,9 @@ class Git(LazyMixin):
:param universal_newlines:
if True, pipes will be opened as text, and lines are split at
all known line endings.
+ :param shell:
+ Whether to invoke commands through a shell (see `Popen(..., shell=True)`).
+ It overrides :attr:`USE_SHELL` if it is not `None`.
:param kill_after_timeout:
To specify a timeout in seconds for the git command, after which the process
should be killed. This will have no effect if as_process is set to True. It is
@@ -544,7 +548,7 @@ class Git(LazyMixin):
stdin=istream,
stderr=PIPE,
stdout=PIPE if with_stdout else open(os.devnull, 'wb'),
- shell=self.USE_SHELL,
+ shell=shell is not None and shell or self.USE_SHELL,
close_fds=(is_posix), # unsupported on windows
universal_newlines=universal_newlines,
creationflags=PROC_CREATIONFLAGS,
diff --git a/git/compat.py b/git/compat.py
index e760575d..441a3761 100644
--- a/git/compat.py
+++ b/git/compat.py
@@ -7,6 +7,7 @@
"""utilities to help provide compatibility with python 3"""
# flake8: noqa
+import locale
import os
import sys
@@ -15,7 +16,6 @@ from gitdb.utils.compat import (
MAXSIZE,
izip,
)
-
from gitdb.utils.encoding import (
string_types,
text_type,
@@ -23,6 +23,7 @@ from gitdb.utils.encoding import (
force_text
)
+
PY3 = sys.version_info[0] >= 3
is_win = (os.name == 'nt')
is_posix = (os.name == 'posix')
@@ -76,6 +77,16 @@ def safe_encode(s):
raise TypeError('Expected bytes or text, but got %r' % (s,))
+def win_encode(s):
+ """Encode unicodes for process arguments on Windows."""
+ if isinstance(s, unicode):
+ return s.encode(locale.getpreferredencoding(False))
+ elif isinstance(s, bytes):
+ return s
+ elif s is not None:
+ raise TypeError('Expected bytes or text, but got %r' % (s,))
+
+
def with_metaclass(meta, *bases):
"""copied from https://github.com/Byron/bcore/blob/master/src/python/butility/future.py#L15"""
class metaclass(meta):
diff --git a/git/index/base.py b/git/index/base.py
index d7d9fc3a..9b6d28ab 100644
--- a/git/index/base.py
+++ b/git/index/base.py
@@ -214,8 +214,8 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable):
self.entries
lfd = LockedFD(file_path or self._file_path)
stream = lfd.open(write=True, stream=True)
- ok = False
+ ok = False
try:
self._serialize(stream, ignore_extension_data)
ok = True
@@ -602,14 +602,13 @@ class IndexFile(LazyMixin, diff.Diffable, Serializable):
stream = None
if S_ISLNK(st.st_mode):
# in PY3, readlink is string, but we need bytes. In PY2, it's just OS encoded bytes, we assume UTF-8
- stream = BytesIO(force_bytes(os.readlink(filepath), encoding=defenc))
+ open_stream = lambda: BytesIO(force_bytes(os.readlink(filepath), encoding=defenc))
else:
- stream = open(filepath, 'rb')
- # END handle stream
- fprogress(filepath, False, filepath)
- istream = self.repo.odb.store(IStream(Blob.type, st.st_size, stream))
- fprogress(filepath, True, filepath)
- stream.close()
+ open_stream = lambda: open(filepath, 'rb')
+ with open_stream() as stream:
+ fprogress(filepath, False, filepath)
+ istream = self.repo.odb.store(IStream(Blob.type, st.st_size, stream))
+ fprogress(filepath, True, filepath)
return BaseIndexEntry((stat_mode_to_index_mode(st.st_mode),
istream.binsha, 0, to_native_path_linux(filepath)))
diff --git a/git/objects/submodule/base.py b/git/objects/submodule/base.py
index fb5f774d..3196ef8f 100644
--- a/git/objects/submodule/base.py
+++ b/git/objects/submodule/base.py
@@ -29,7 +29,8 @@ from git.exc import (
)
from git.compat import (
string_types,
- defenc
+ defenc,
+ is_win,
)
import stat
@@ -289,14 +290,16 @@ class Submodule(util.IndexObject, Iterable, Traversable):
"""
git_file = os.path.join(working_tree_dir, '.git')
rela_path = os.path.relpath(module_abspath, start=working_tree_dir)
- fp = open(git_file, 'wb')
- fp.write(("gitdir: %s" % rela_path).encode(defenc))
- fp.close()
-
- writer = GitConfigParser(os.path.join(module_abspath, 'config'), read_only=False, merge_includes=False)
- writer.set_value('core', 'worktree',
- to_native_path_linux(os.path.relpath(working_tree_dir, start=module_abspath)))
- writer.release()
+ if is_win:
+ if os.path.isfile(git_file):
+ os.remove(git_file)
+ with open(git_file, 'wb') as fp:
+ fp.write(("gitdir: %s" % rela_path).encode(defenc))
+
+ with GitConfigParser(os.path.join(module_abspath, 'config'),
+ read_only=False, merge_includes=False) as writer:
+ writer.set_value('core', 'worktree',
+ to_native_path_linux(os.path.relpath(working_tree_dir, start=module_abspath)))
#{ Edit Interface
diff --git a/git/repo/base.py b/git/repo/base.py
index 9cc70571..947d77d2 100644
--- a/git/repo/base.py
+++ b/git/repo/base.py
@@ -899,12 +899,8 @@ class Repo(object):
try:
proc = git.clone(url, path, with_extended_output=True, as_process=True,
v=True, **add_progress(kwargs, git, progress))
- if progress:
- handle_process_output(proc, None, progress.new_message_handler(), finalize_process)
- else:
- (stdout, stderr) = proc.communicate()
- finalize_process(proc, stderr=stderr)
- # end handle progress
+ progress_handler = progress and progress.new_message_handler() or None
+ handle_process_output(proc, None, progress_handler, finalize_process)
finally:
if prev_cwd is not None:
os.chdir(prev_cwd)
diff --git a/git/test/test_index.py b/git/test/test_index.py
index 46cc990d..1ffbe9e2 100644
--- a/git/test/test_index.py
+++ b/git/test/test_index.py
@@ -113,9 +113,8 @@ class TestIndex(TestBase):
# write the data - it must match the original
tmpfile = tempfile.mktemp()
index_merge.write(tmpfile)
- fp = open(tmpfile, 'rb')
- self.assertEqual(fp.read(), fixture("index_merge"))
- fp.close()
+ with open(tmpfile, 'rb') as fp:
+ self.assertEqual(fp.read(), fixture("index_merge"))
os.remove(tmpfile)
def _cmp_tree_index(self, tree, index):
@@ -329,22 +328,19 @@ class TestIndex(TestBase):
# reset the working copy as well to current head,to pull 'back' as well
new_data = b"will be reverted"
file_path = os.path.join(rw_repo.working_tree_dir, "CHANGES")
- fp = open(file_path, "wb")
- fp.write(new_data)
- fp.close()
+ with open(file_path, "wb") as fp:
+ fp.write(new_data)
index.reset(rev_head_parent, working_tree=True)
assert not index.diff(None)
self.assertEqual(cur_branch, rw_repo.active_branch)
self.assertEqual(cur_commit, rw_repo.head.commit)
- fp = open(file_path, 'rb')
- try:
+ with open(file_path, 'rb') as fp:
assert fp.read() != new_data
- finally:
- fp.close()
# test full checkout
test_file = os.path.join(rw_repo.working_tree_dir, "CHANGES")
- open(test_file, 'ab').write(b"some data")
+ with open(test_file, 'ab') as fd:
+ fd.write(b"some data")
rval = index.checkout(None, force=True, fprogress=self._fprogress)
assert 'CHANGES' in list(rval)
self._assert_fprogress([None])
@@ -369,9 +365,8 @@ class TestIndex(TestBase):
# checkout file with modifications
append_data = b"hello"
- fp = open(test_file, "ab")
- fp.write(append_data)
- fp.close()
+ with open(test_file, "ab") as fp:
+ fp.write(append_data)
try:
index.checkout(test_file)
except CheckoutError as e:
@@ -380,7 +375,9 @@ class TestIndex(TestBase):
self.assertEqual(len(e.failed_files), len(e.failed_reasons))
self.assertIsInstance(e.failed_reasons[0], string_types)
self.assertEqual(len(e.valid_files), 0)
- assert open(test_file, 'rb').read().endswith(append_data)
+ with open(test_file, 'rb') as fd:
+ s = fd.read()
+ self.assertTrue(s.endswith(append_data), s)
else:
raise AssertionError("Exception CheckoutError not thrown")
@@ -639,9 +636,10 @@ class TestIndex(TestBase):
if is_win:
# simlinks should contain the link as text ( which is what a
# symlink actually is )
- open(fake_symlink_path, 'rb').read() == link_target
+ with open(fake_symlink_path, 'rt') as fd:
+ self.assertEqual(fd.read(), link_target)
else:
- assert S_ISLNK(os.lstat(fake_symlink_path)[ST_MODE])
+ self.assertTrue(S_ISLNK(os.lstat(fake_symlink_path)[ST_MODE]))
# TEST RENAMING
def assert_mv_rval(rval):
@@ -691,7 +689,8 @@ class TestIndex(TestBase):
for fid in range(3):
fname = 'newfile%i' % fid
- open(fname, 'wb').write(b"abcd")
+ with open(fname, 'wb') as fd:
+ fd.write(b"abcd")
yield Blob(rw_repo, Blob.NULL_BIN_SHA, 0o100644, fname)
# END for each new file
# END path producer
diff --git a/git/test/test_repo.py b/git/test/test_repo.py
index e2c18d3f..a37c9be9 100644
--- a/git/test/test_repo.py
+++ b/git/test/test_repo.py
@@ -7,6 +7,7 @@
import glob
from io import BytesIO
import itertools
+import functools as fnt
import os
import pickle
import sys
@@ -29,7 +30,12 @@ from git import (
BadName,
GitCommandError
)
-from git.compat import string_types
+from git.compat import (
+ PY3,
+ is_win,
+ string_types,
+ win_encode,
+)
from git.exc import (
BadObject,
)
@@ -93,10 +99,10 @@ class TestRepo(TestBase):
@with_rw_repo('0.3.2.1')
def test_repo_creation_from_different_paths(self, rw_repo):
r_from_gitdir = Repo(rw_repo.git_dir)
- assert r_from_gitdir.git_dir == rw_repo.git_dir
+ self.assertEqual(r_from_gitdir.git_dir, rw_repo.git_dir)
assert r_from_gitdir.git_dir.endswith('.git')
assert not rw_repo.git.working_dir.endswith('.git')
- assert r_from_gitdir.git.working_dir == rw_repo.git.working_dir
+ self.assertEqual(r_from_gitdir.git.working_dir, rw_repo.git.working_dir)
def test_description(self):
txt = "Test repository"
@@ -110,17 +116,17 @@ class TestRepo(TestBase):
def test_heads_should_populate_head_data(self):
for head in self.rorepo.heads:
assert head.name
- assert isinstance(head.commit, Commit)
+ self.assertIsInstance(head.commit, Commit)
# END for each head
- assert isinstance(self.rorepo.heads.master, Head)
- assert isinstance(self.rorepo.heads['master'], Head)
+ self.assertIsInstance(self.rorepo.heads.master, Head)
+ self.assertIsInstance(self.rorepo.heads['master'], Head)
def test_tree_from_revision(self):
tree = self.rorepo.tree('0.1.6')
- assert len(tree.hexsha) == 40
- assert tree.type == "tree"
- assert self.rorepo.tree(tree) == tree
+ self.assertEqual(len(tree.hexsha), 40)
+ self.assertEqual(tree.type, "tree")
+ self.assertEqual(self.rorepo.tree(tree), tree)
# try from invalid revision that does not exist
self.failUnlessRaises(BadName, self.rorepo.tree, 'hello world')
@@ -130,13 +136,13 @@ class TestRepo(TestBase):
def test_commit_from_revision(self):
commit = self.rorepo.commit('0.1.4')
- assert commit.type == 'commit'
- assert self.rorepo.commit(commit) == commit
+ self.assertEqual(commit.type, 'commit')
+ self.assertEqual(self.rorepo.commit(commit), commit)
def test_commits(self):
mc = 10
commits = list(self.rorepo.iter_commits('0.1.6', max_count=mc))
- assert len(commits) == mc
+ self.assertEqual(len(commits), mc)
c = commits[0]
assert_equal('9a4b1d4d11eee3c5362a4152216376e634bd14cf', c.hexsha)
@@ -153,23 +159,23 @@ class TestRepo(TestBase):
assert_equal("Bumped version 0.1.6\n", c.message)
c = commits[1]
- assert isinstance(c.parents, tuple)
+ self.assertIsInstance(c.parents, tuple)
def test_trees(self):
mc = 30
num_trees = 0
for tree in self.rorepo.iter_trees('0.1.5', max_count=mc):
num_trees += 1
- assert isinstance(tree, Tree)
+ self.assertIsInstance(tree, Tree)
# END for each tree
- assert num_trees == mc
+ self.assertEqual(num_trees, mc)
def _assert_empty_repo(self, repo):
# test all kinds of things with an empty, freshly initialized repo.
# It should throw good errors
# entries should be empty
- assert len(repo.index.entries) == 0
+ self.assertEqual(len(repo.index.entries), 0)
# head is accessible
assert repo.head
@@ -201,7 +207,7 @@ class TestRepo(TestBase):
# with specific path
for path in (git_dir_rela, git_dir_abs):
r = Repo.init(path=path, bare=True)
- assert isinstance(r, Repo)
+ self.assertIsInstance(r, Repo)
assert r.bare is True
assert not r.has_separate_working_tree()
assert os.path.isdir(r.git_dir)
@@ -257,18 +263,18 @@ class TestRepo(TestBase):
def test_daemon_export(self):
orig_val = self.rorepo.daemon_export
self.rorepo.daemon_export = not orig_val
- assert self.rorepo.daemon_export == (not orig_val)
+ self.assertEqual(self.rorepo.daemon_export, (not orig_val))
self.rorepo.daemon_export = orig_val
- assert self.rorepo.daemon_export == orig_val
+ self.assertEqual(self.rorepo.daemon_export, orig_val)
def test_alternates(self):
cur_alternates = self.rorepo.alternates
# empty alternates
self.rorepo.alternates = []
- assert self.rorepo.alternates == []
+ self.assertEqual(self.rorepo.alternates, [])
alts = ["other/location", "this/location"]
self.rorepo.alternates = alts
- assert alts == self.rorepo.alternates
+ self.assertEqual(alts, self.rorepo.alternates)
self.rorepo.alternates = cur_alternates
def test_repr(self):
@@ -313,11 +319,11 @@ class TestRepo(TestBase):
assert rwrepo.is_dirty(untracked_files=True, path="doc") is True
def test_head(self):
- assert self.rorepo.head.reference.object == self.rorepo.active_branch.object
+ self.assertEqual(self.rorepo.head.reference.object, self.rorepo.active_branch.object)
def test_index(self):
index = self.rorepo.index
- assert isinstance(index, IndexFile)
+ self.assertIsInstance(index, IndexFile)
def test_tag(self):
assert self.rorepo.tag('refs/tags/0.1.5').commit
@@ -361,7 +367,7 @@ class TestRepo(TestBase):
# BINARY BLAME
git.return_value = fixture('blame_binary')
blames = self.rorepo.blame('master', 'rps')
- assert len(blames) == 2
+ self.assertEqual(len(blames), 2)
def test_blame_real(self):
c = 0
@@ -381,32 +387,35 @@ class TestRepo(TestBase):
git.return_value = fixture('blame_incremental')
blame_output = self.rorepo.blame_incremental('9debf6b0aafb6f7781ea9d1383c86939a1aacde3', 'AUTHORS')
blame_output = list(blame_output)
- assert len(blame_output) == 5
+ self.assertEqual(len(blame_output), 5)
# Check all outputted line numbers
ranges = flatten([entry.linenos for entry in blame_output])
- assert ranges == flatten([range(2, 3), range(14, 15), range(1, 2), range(3, 14), range(15, 17)]), str(ranges)
+ self.assertEqual(ranges, flatten([range(2, 3), range(14, 15), range(1, 2), range(3, 14), range(15, 17)]))
commits = [entry.commit.hexsha[:7] for entry in blame_output]
- assert commits == ['82b8902', '82b8902', 'c76852d', 'c76852d', 'c76852d'], str(commits)
+ self.assertEqual(commits, ['82b8902', '82b8902', 'c76852d', 'c76852d', 'c76852d'])
# Original filenames
- assert all([entry.orig_path == u'AUTHORS' for entry in blame_output])
+ self.assertSequenceEqual([entry.orig_path for entry in blame_output], [u'AUTHORS'] * len(blame_output))
# Original line numbers
orig_ranges = flatten([entry.orig_linenos for entry in blame_output])
- assert orig_ranges == flatten([range(2, 3), range(14, 15), range(1, 2), range(2, 13), range(13, 15)]), str(orig_ranges) # noqa
+ self.assertEqual(orig_ranges, flatten([range(2, 3), range(14, 15), range(1, 2), range(2, 13), range(13, 15)])) # noqa E501
@patch.object(Git, '_call_process')
def test_blame_complex_revision(self, git):
git.return_value = fixture('blame_complex_revision')
res = self.rorepo.blame("HEAD~10..HEAD", "README.md")
- assert len(res) == 1
- assert len(res[0][1]) == 83, "Unexpected amount of parsed blame lines"
+ self.assertEqual(len(res), 1)
+ self.assertEqual(len(res[0][1]), 83, "Unexpected amount of parsed blame lines")
@with_rw_repo('HEAD', bare=False)
def test_untracked_files(self, rwrepo):
- for (run, repo_add) in enumerate((rwrepo.index.add, rwrepo.git.add)):
+ for run, (repo_add, is_invoking_git) in enumerate((
+ (rwrepo.index.add, False),
+ (rwrepo.git.add, True),
+ )):
base = rwrepo.working_tree_dir
files = (join_path_native(base, u"%i_test _myfile" % run),
join_path_native(base, "%i_test_other_file" % run),
@@ -424,10 +433,15 @@ class TestRepo(TestBase):
num_test_untracked = 0
for utfile in untracked_files:
num_test_untracked += join_path_native(base, utfile) in files
- assert len(files) == num_test_untracked
+ self.assertEqual(len(files), num_test_untracked)
+ if is_win and not PY3 and is_invoking_git:
+ ## On Windows, shell needed when passing unicode cmd-args.
+ #
+ repo_add = fnt.partial(repo_add, shell=True)
+ untracked_files = [win_encode(f) for f in untracked_files]
repo_add(untracked_files)
- assert len(rwrepo.untracked_files) == (num_recently_untracked - len(files))
+ self.assertEqual(len(rwrepo.untracked_files), (num_recently_untracked - len(files)))
# end for each run
def test_config_reader(self):
@@ -465,8 +479,9 @@ class TestRepo(TestBase):
def test_comparison_and_hash(self):
# this is only a preliminary test, more testing done in test_index
- assert self.rorepo == self.rorepo and not (self.rorepo != self.rorepo)
- assert len(set((self.rorepo, self.rorepo))) == 1
+ self.assertEqual(self.rorepo, self.rorepo)
+ self.assertFalse(self.rorepo != self.rorepo)
+ self.assertEqual(len(set((self.rorepo, self.rorepo))), 1)
@with_rw_directory
def test_tilde_and_env_vars_in_repo_path(self, rw_dir):
@@ -505,57 +520,59 @@ class TestRepo(TestBase):
# readlines no limit
s = mkfull()
lines = s.readlines()
- assert len(lines) == 3 and lines[-1].endswith(b'\n')
- assert s._stream.tell() == len(d) # must have scrubbed to the end
+ self.assertEqual(len(lines), 3)
+ self.assertTrue(lines[-1].endswith(b'\n'), lines[-1])
+ self.assertEqual(s._stream.tell(), len(d)) # must have scrubbed to the end
# realines line limit
s = mkfull()
lines = s.readlines(5)
- assert len(lines) == 1
+ self.assertEqual(len(lines), 1)
# readlines on tiny sections
s = mktiny()
lines = s.readlines()
- assert len(lines) == 1 and lines[0] == l1p
- assert s._stream.tell() == ts + 1
+ self.assertEqual(len(lines), 1)
+ self.assertEqual(lines[0], l1p)
+ self.assertEqual(s._stream.tell(), ts + 1)
# readline no limit
s = mkfull()
- assert s.readline() == l1
- assert s.readline() == l2
- assert s.readline() == l3
- assert s.readline() == b''
- assert s._stream.tell() == len(d)
+ self.assertEqual(s.readline(), l1)
+ self.assertEqual(s.readline(), l2)
+ self.assertEqual(s.readline(), l3)
+ self.assertEqual(s.readline(), b'')
+ self.assertEqual(s._stream.tell(), len(d))
# readline limit
s = mkfull()
- assert s.readline(5) == l1p
- assert s.readline() == l1[5:]
+ self.assertEqual(s.readline(5), l1p)
+ self.assertEqual(s.readline(), l1[5:])
# readline on tiny section
s = mktiny()
- assert s.readline() == l1p
- assert s.readline() == b''
- assert s._stream.tell() == ts + 1
+ self.assertEqual(s.readline(), l1p)
+ self.assertEqual(s.readline(), b'')
+ self.assertEqual(s._stream.tell(), ts + 1)
# read no limit
s = mkfull()
- assert s.read() == d[:-1]
- assert s.read() == b''
- assert s._stream.tell() == len(d)
+ self.assertEqual(s.read(), d[:-1])
+ self.assertEqual(s.read(), b'')
+ self.assertEqual(s._stream.tell(), len(d))
# read limit
s = mkfull()
- assert s.read(5) == l1p
- assert s.read(6) == l1[5:]
- assert s._stream.tell() == 5 + 6 # its not yet done
+ self.assertEqual(s.read(5), l1p)
+ self.assertEqual(s.read(6), l1[5:])
+ self.assertEqual(s._stream.tell(), 5 + 6) # its not yet done
# read tiny
s = mktiny()
- assert s.read(2) == l1[:2]
- assert s._stream.tell() == 2
- assert s.read() == l1[2:ts]
- assert s._stream.tell() == ts + 1
+ self.assertEqual(s.read(2), l1[:2])
+ self.assertEqual(s._stream.tell(), 2)
+ self.assertEqual(s.read(), l1[2:ts])
+ self.assertEqual(s._stream.tell(), ts + 1)
def _assert_rev_parse_types(self, name, rev_obj):
rev_parse = self.rorepo.rev_parse
@@ -565,11 +582,12 @@ class TestRepo(TestBase):
# tree and blob type
obj = rev_parse(name + '^{tree}')
- assert obj == rev_obj.tree
+ self.assertEqual(obj, rev_obj.tree)
obj = rev_parse(name + ':CHANGES')
- assert obj.type == 'blob' and obj.path == 'CHANGES'
- assert rev_obj.tree['CHANGES'] == obj
+ self.assertEqual(obj.type, 'blob')
+ self.assertEqual(obj.path, 'CHANGES')
+ self.assertEqual(rev_obj.tree['CHANGES'], obj)
def _assert_rev_parse(self, name):
"""tries multiple different rev-parse syntaxes with the given name
@@ -585,7 +603,7 @@ class TestRepo(TestBase):
# try history
rev = name + "~"
obj2 = rev_parse(rev)
- assert obj2 == obj.parents[0]
+ self.assertEqual(obj2, obj.parents[0])
self._assert_rev_parse_types(rev, obj2)
# history with number
@@ -598,20 +616,20 @@ class TestRepo(TestBase):
for pn in range(11):
rev = name + "~%i" % (pn + 1)
obj2 = rev_parse(rev)
- assert obj2 == history[pn]
+ self.assertEqual(obj2, history[pn])
self._assert_rev_parse_types(rev, obj2)
# END history check
# parent ( default )
rev = name + "^"
obj2 = rev_parse(rev)
- assert obj2 == obj.parents[0]
+ self.assertEqual(obj2, obj.parents[0])
self._assert_rev_parse_types(rev, obj2)
# parent with number
for pn, parent in enumerate(obj.parents):
rev = name + "^%i" % (pn + 1)
- assert rev_parse(rev) == parent
+ self.assertEqual(rev_parse(rev), parent)
self._assert_rev_parse_types(rev, parent)
# END for each parent
@@ -627,7 +645,7 @@ class TestRepo(TestBase):
rev_parse = self.rorepo.rev_parse
# try special case: This one failed at some point, make sure its fixed
- assert rev_parse("33ebe").hexsha == "33ebe7acec14b25c5f84f35a664803fcab2f7781"
+ self.assertEqual(rev_parse("33ebe").hexsha, "33ebe7acec14b25c5f84f35a664803fcab2f7781")
# start from reference
num_resolved = 0
@@ -638,7 +656,7 @@ class TestRepo(TestBase):
path_section = '/'.join(path_tokens[-(pt + 1):])
try:
obj = self._assert_rev_parse(path_section)
- assert obj.type == ref.object.type
+ self.assertEqual(obj.type, ref.object.type)
num_resolved += 1
except (BadName, BadObject):
print("failed on %s" % path_section)
@@ -653,31 +671,31 @@ class TestRepo(TestBase):
# it works with tags !
tag = self._assert_rev_parse('0.1.4')
- assert tag.type == 'tag'
+ self.assertEqual(tag.type, 'tag')
# try full sha directly ( including type conversion )
- assert tag.object == rev_parse(tag.object.hexsha)
+ self.assertEqual(tag.object, rev_parse(tag.object.hexsha))
self._assert_rev_parse_types(tag.object.hexsha, tag.object)
# multiple tree types result in the same tree: HEAD^{tree}^{tree}:CHANGES
rev = '0.1.4^{tree}^{tree}'
- assert rev_parse(rev) == tag.object.tree
- assert rev_parse(rev + ':CHANGES') == tag.object.tree['CHANGES']
+ self.assertEqual(rev_parse(rev), tag.object.tree)
+ self.assertEqual(rev_parse(rev + ':CHANGES'), tag.object.tree['CHANGES'])
# try to get parents from first revision - it should fail as no such revision
# exists
first_rev = "33ebe7acec14b25c5f84f35a664803fcab2f7781"
commit = rev_parse(first_rev)
- assert len(commit.parents) == 0
- assert commit.hexsha == first_rev
+ self.assertEqual(len(commit.parents), 0)
+ self.assertEqual(commit.hexsha, first_rev)
self.failUnlessRaises(BadName, rev_parse, first_rev + "~")
self.failUnlessRaises(BadName, rev_parse, first_rev + "^")
# short SHA1
commit2 = rev_parse(first_rev[:20])
- assert commit2 == commit
+ self.assertEqual(commit2, commit)
commit2 = rev_parse(first_rev[:5])
- assert commit2 == commit
+ self.assertEqual(commit2, commit)
# todo: dereference tag into a blob 0.1.7^{blob} - quite a special one
# needs a tag which points to a blob
@@ -685,13 +703,13 @@ class TestRepo(TestBase):
# ref^0 returns commit being pointed to, same with ref~0, and ^{}
tag = rev_parse('0.1.4')
for token in (('~0', '^0', '^{}')):
- assert tag.object == rev_parse('0.1.4%s' % token)
+ self.assertEqual(tag.object, rev_parse('0.1.4%s' % token))
# END handle multiple tokens
# try partial parsing
max_items = 40
for i, binsha in enumerate(self.rorepo.odb.sha_iter()):
- assert rev_parse(bin_to_hex(binsha)[:8 - (i % 2)].decode('ascii')).binsha == binsha
+ self.assertEqual(rev_parse(bin_to_hex(binsha)[:8 - (i % 2)].decode('ascii')).binsha, binsha)
if i > max_items:
# this is rather slow currently, as rev_parse returns an object
# which requires accessing packs, it has some additional overhead
@@ -712,13 +730,13 @@ class TestRepo(TestBase):
self.failUnlessRaises(BadObject, rev_parse, "%s@{0}" % head.commit.hexsha)
# uses HEAD.ref by default
- assert rev_parse('@{0}') == head.commit
+ self.assertEqual(rev_parse('@{0}'), head.commit)
if not head.is_detached:
refspec = '%s@{0}' % head.ref.name
- assert rev_parse(refspec) == head.ref.commit
+ self.assertEqual(rev_parse(refspec), head.ref.commit)
# all additional specs work as well
- assert rev_parse(refspec + "^{tree}") == head.commit.tree
- assert rev_parse(refspec + ":CHANGES").type == 'blob'
+ self.assertEqual(rev_parse(refspec + "^{tree}"), head.commit.tree)
+ self.assertEqual(rev_parse(refspec + ":CHANGES").type, 'blob')
# END operate on non-detached head
# position doesn't exist
@@ -734,13 +752,13 @@ class TestRepo(TestBase):
target_type = GitCmdObjectDB
if sys.version_info[:2] < (2, 5):
target_type = GitCmdObjectDB
- assert isinstance(self.rorepo.odb, target_type)
+ self.assertIsInstance(self.rorepo.odb, target_type)
def test_submodules(self):
- assert len(self.rorepo.submodules) == 1 # non-recursive
- assert len(list(self.rorepo.iter_submodules())) >= 2
+ self.assertEqual(len(self.rorepo.submodules), 1) # non-recursive
+ self.assertGreaterEqual(len(list(self.rorepo.iter_submodules())), 2)
- assert isinstance(self.rorepo.submodule("gitdb"), Submodule)
+ self.assertIsInstance(self.rorepo.submodule("gitdb"), Submodule)
self.failUnlessRaises(ValueError, self.rorepo.submodule, "doesn't exist")
@with_rw_repo('HEAD', bare=False)
@@ -753,7 +771,7 @@ class TestRepo(TestBase):
# test create submodule
sm = rwrepo.submodules[0]
sm = rwrepo.create_submodule("my_new_sub", "some_path", join_path_native(self.rorepo.working_tree_dir, sm.path))
- assert isinstance(sm, Submodule)
+ self.assertIsInstance(sm, Submodule)
# note: the rest of this functionality is tested in test_submodule
@@ -767,12 +785,12 @@ class TestRepo(TestBase):
# Create a repo and make sure it's pointing to the relocated .git directory.
git_file_repo = Repo(rwrepo.working_tree_dir)
- assert os.path.abspath(git_file_repo.git_dir) == real_path_abs
+ self.assertEqual(os.path.abspath(git_file_repo.git_dir), real_path_abs)
# Test using an absolute gitdir path in the .git file.
open(git_file_path, 'wb').write(('gitdir: %s\n' % real_path_abs).encode('ascii'))
git_file_repo = Repo(rwrepo.working_tree_dir)
- assert os.path.abspath(git_file_repo.git_dir) == real_path_abs
+ self.assertEqual(os.path.abspath(git_file_repo.git_dir), real_path_abs)
def test_file_handle_leaks(self):
def last_commit(repo, rev, path):
@@ -793,7 +811,7 @@ class TestRepo(TestBase):
def test_remote_method(self):
self.failUnlessRaises(ValueError, self.rorepo.remote, 'foo-blue')
- assert isinstance(self.rorepo.remote(name='origin'), Remote)
+ self.assertIsInstance(self.rorepo.remote(name='origin'), Remote)
@with_rw_directory
def test_empty_repo(self, rw_dir):
@@ -801,7 +819,7 @@ class TestRepo(TestBase):
r = Repo.init(rw_dir, mkdir=False)
# It's ok not to be able to iterate a commit, as there is none
self.failUnlessRaises(ValueError, r.iter_commits)
- assert r.active_branch.name == 'master'
+ self.assertEqual(r.active_branch.name, 'master')
assert not r.active_branch.is_valid(), "Branch is yet to be born"
# actually, when trying to create a new branch without a commit, git itself fails
@@ -841,12 +859,15 @@ class TestRepo(TestBase):
# two commit merge-base
res = repo.merge_base(c1, c2)
- assert isinstance(res, list) and len(res) == 1 and isinstance(res[0], Commit)
- assert res[0].hexsha.startswith('3936084')
+ self.assertIsInstance(res, list)
+ self.assertEqual(len(res), 1)
+ self.assertIsInstance(res[0], Commit)
+ self.assertTrue(res[0].hexsha.startswith('3936084'))
for kw in ('a', 'all'):
res = repo.merge_base(c1, c2, c3, **{kw: True})
- assert isinstance(res, list) and len(res) == 1
+ self.assertIsInstance(res, list)
+ self.assertEqual(len(res), 1)
# end for each keyword signalling all merge-bases to be returned
# Test for no merge base - can't do as we have
diff --git a/git/test/test_submodule.py b/git/test/test_submodule.py
index dcfe9216..8e2829b2 100644
--- a/git/test/test_submodule.py
+++ b/git/test/test_submodule.py
@@ -309,7 +309,8 @@ class TestSubmodule(TestBase):
# but ... we have untracked files in the child submodule
fn = join_path_native(csm.module().working_tree_dir, "newfile")
- open(fn, 'w').write("hi")
+ with open(fn, 'w') as fd:
+ fd.write("hi")
self.failUnlessRaises(InvalidGitRepositoryError, sm.remove)
# forcibly delete the child repository