summaryrefslogtreecommitdiff
path: root/git/test
diff options
context:
space:
mode:
Diffstat (limited to 'git/test')
-rw-r--r--git/test/__init__.py2
-rw-r--r--git/test/db/base.py285
-rw-r--r--git/test/db/cmd/test_base.py66
-rw-r--r--git/test/db/dulwich/lib.py8
-rw-r--r--git/test/db/dulwich/test_base.py10
-rw-r--r--git/test/db/lib.py105
-rw-r--r--git/test/db/py/test_base.py6
-rw-r--r--git/test/db/py/test_git.py22
-rw-r--r--git/test/db/py/test_loose.py18
-rw-r--r--git/test/db/py/test_mem.py13
-rw-r--r--git/test/db/py/test_pack.py36
-rw-r--r--git/test/db/py/test_ref.py32
-rw-r--r--git/test/db/pygit2/lib.py8
-rw-r--r--git/test/db/pygit2/test_base.py10
-rw-r--r--git/test/db/test_base.py2
-rw-r--r--git/test/lib/__init__.py6
-rw-r--r--git/test/lib/asserts.py18
-rw-r--r--git/test/lib/base.py76
-rw-r--r--git/test/lib/helper.py139
-rw-r--r--git/test/objects/__init__.py1
-rw-r--r--git/test/objects/lib.py16
-rw-r--r--git/test/objects/test_blob.py10
-rw-r--r--git/test/objects/test_commit.py132
-rw-r--r--git/test/objects/test_submodule.py253
-rw-r--r--git/test/objects/test_tree.py75
-rw-r--r--git/test/performance/db/__init__.py1
-rw-r--r--git/test/performance/db/looseodb_impl.py65
-rw-r--r--git/test/performance/db/odb_impl.py34
-rw-r--r--git/test/performance/db/packedodb_impl.py38
-rw-r--r--git/test/performance/db/test_looseodb_cmd.py7
-rw-r--r--git/test/performance/db/test_looseodb_dulwich.py4
-rw-r--r--git/test/performance/db/test_looseodb_pure.py2
-rw-r--r--git/test/performance/db/test_looseodb_pygit2.py4
-rw-r--r--git/test/performance/db/test_odb_cmd.py2
-rw-r--r--git/test/performance/db/test_odb_dulwich.py4
-rw-r--r--git/test/performance/db/test_odb_pure.py2
-rw-r--r--git/test/performance/db/test_odb_pygit2.py4
-rw-r--r--git/test/performance/db/test_packedodb_pure.py37
-rw-r--r--git/test/performance/lib.py32
-rw-r--r--git/test/performance/objects/__init__.py1
-rw-r--r--git/test/performance/objects/test_commit.py47
-rw-r--r--git/test/performance/test_utils.py87
-rw-r--r--git/test/refs/__init__.py1
-rw-r--r--git/test/refs/test_reflog.py50
-rw-r--r--git/test/refs/test_refs.py220
-rw-r--r--git/test/test_base.py123
-rw-r--r--git/test/test_cmd.py37
-rw-r--r--git/test/test_config.py49
-rw-r--r--git/test/test_diff.py57
-rw-r--r--git/test/test_example.py28
-rw-r--r--git/test/test_fun.py114
-rw-r--r--git/test/test_import.py27
-rw-r--r--git/test/test_index.py314
-rw-r--r--git/test/test_pack.py134
-rw-r--r--git/test/test_remote.py228
-rw-r--r--git/test/test_stats.py17
-rw-r--r--git/test/test_stream.py78
-rw-r--r--git/test/test_util.py111
58 files changed, 1673 insertions, 1635 deletions
diff --git a/git/test/__init__.py b/git/test/__init__.py
index f71cbdf0..bc832619 100644
--- a/git/test/__init__.py
+++ b/git/test/__init__.py
@@ -6,9 +6,9 @@
import git.util
+
def _init_pool():
"""Assure the pool is actually threaded"""
size = 2
print "Setting ThreadPool to %i" % size
git.util.pool.set_size(size)
-
diff --git a/git/test/db/base.py b/git/test/db/base.py
index 80cb9ebb..dd8e9d8f 100644
--- a/git/test/db/base.py
+++ b/git/test/db/base.py
@@ -14,7 +14,8 @@ from git.util import join_path_native
from git.exc import BadObject
from git.util import hex_to_bin, bin_to_hex
-import os, sys
+import os
+import sys
import tempfile
import shutil
from cStringIO import StringIO
@@ -24,12 +25,13 @@ from git.db.compat import RepoCompatibilityInterface
class RepoGlobalsItemDeletorMetaCls(GlobalsItemDeletorMetaCls):
ModuleToDelete = 'RepoBase'
-
+
class RepoBase(TestDBBase):
+
"""Basic test for everything a fully implemented repository should support"""
__metaclass__ = RepoGlobalsItemDeletorMetaCls
-
+
def test_new_should_raise_on_invalid_repo_location(self):
self.failUnlessRaises(InvalidGitRepositoryError, self.RepoCls, tempfile.gettempdir())
@@ -55,21 +57,21 @@ class RepoBase(TestDBBase):
def test_heads_should_populate_head_data(self):
for head in self.rorepo.heads:
assert head.name
- assert isinstance(head.commit,Commit)
- # END for each head
-
+ assert isinstance(head.commit, Commit)
+ # END for each head
+
assert isinstance(self.rorepo.heads.master, Head)
assert isinstance(self.rorepo.heads['master'], Head)
-
+
def test_tree_from_revision(self):
tree = self.rorepo.tree('0.1.6')
- assert len(tree.hexsha) == 40
+ assert len(tree.hexsha) == 40
assert tree.type == "tree"
assert self.rorepo.tree(tree) == tree
-
+
# try from invalid revision that does not exist
self.failUnlessRaises(BadObject, self.rorepo.tree, 'hello world')
-
+
def test_commit_from_revision(self):
commit = self.rorepo.commit('0.1.4')
assert commit.type == 'commit'
@@ -79,7 +81,7 @@ class RepoBase(TestDBBase):
mc = 10
commits = list(self.rorepo.iter_commits('0.1.6', max_count=mc))
assert len(commits) == mc
-
+
c = commits[0]
assert_equal('9a4b1d4d11eee3c5362a4152216376e634bd14cf', c.hexsha)
assert_equal(["c76852d0bff115720af3f27acdb084c59361e5f6"], [p.hexsha for p in c.parents])
@@ -87,11 +89,11 @@ class RepoBase(TestDBBase):
assert_equal("Michael Trier", c.author.name)
assert_equal("mtrier@gmail.com", c.author.email)
assert_equal(1232829715, c.authored_date)
- assert_equal(5*3600, c.author_tz_offset)
+ assert_equal(5 * 3600, c.author_tz_offset)
assert_equal("Michael Trier", c.committer.name)
assert_equal("mtrier@gmail.com", c.committer.email)
assert_equal(1232829715, c.committed_date)
- assert_equal(5*3600, c.committer_tz_offset)
+ assert_equal(5 * 3600, c.committer_tz_offset)
assert_equal("Bumped version 0.1.6\n", c.message)
c = commits[1]
@@ -106,34 +108,32 @@ class RepoBase(TestDBBase):
# END for each tree
assert num_trees == mc
-
def _assert_empty_repo(self, repo):
- # test all kinds of things with an empty, freshly initialized repo.
+ # test all kinds of things with an empty, freshly initialized repo.
# It should throw good errors
-
+
# entries should be empty
assert len(repo.index.entries) == 0
-
+
# head is accessible
assert repo.head
assert repo.head.ref
assert not repo.head.is_valid()
-
+
# we can change the head to some other ref
head_ref = Head.from_path(repo, Head.to_full_path('some_head'))
assert not head_ref.is_valid()
repo.head.ref = head_ref
-
+
# is_dirty can handle all kwargs
for args in ((1, 0, 0), (0, 1, 0), (0, 0, 1)):
assert not repo.is_dirty(*args)
- # END for each arg
-
+ # END for each arg
+
# we can add a file to the index ( if we are not bare )
if not repo.bare:
pass
# END test repos with working tree
-
def test_init(self):
prev_cwd = os.getcwd()
@@ -148,15 +148,14 @@ class RepoBase(TestDBBase):
assert isinstance(r, self.RepoCls)
assert r.bare == True
assert os.path.isdir(r.git_dir)
-
+
self._assert_empty_repo(r)
-
+
# test clone
clone_path = path + "_clone"
rc = r.clone(clone_path)
self._assert_empty_repo(rc)
-
-
+
try:
shutil.rmtree(clone_path)
except OSError:
@@ -164,11 +163,11 @@ class RepoBase(TestDBBase):
# of the parent directory
pass
# END exception handling
-
+
# try again, this time with the absolute version
rc = self.RepoCls.clone_from(r.git_dir, clone_path)
self._assert_empty_repo(rc)
-
+
shutil.rmtree(git_dir_abs)
try:
shutil.rmtree(clone_path)
@@ -177,14 +176,14 @@ class RepoBase(TestDBBase):
# of the parent directory
pass
# END exception handling
-
+
# END for each path
-
+
os.makedirs(git_dir_rela)
os.chdir(git_dir_rela)
r = self.RepoCls.init(bare=False)
r.bare == False
-
+
self._assert_empty_repo(r)
finally:
try:
@@ -193,26 +192,26 @@ class RepoBase(TestDBBase):
pass
os.chdir(prev_cwd)
# END restore previous state
-
+
def test_bare_property(self):
if isinstance(self.rorepo, RepoCompatibilityInterface):
self.rorepo.bare
- #END handle compatability
+ # END handle compatability
self.rorepo.is_bare
def test_daemon_export(self):
orig_val = self.rorepo.daemon_export
self.rorepo.daemon_export = not orig_val
- assert self.rorepo.daemon_export == ( not orig_val )
+ assert self.rorepo.daemon_export == (not orig_val)
self.rorepo.daemon_export = orig_val
assert self.rorepo.daemon_export == orig_val
-
+
def test_alternates(self):
cur_alternates = self.rorepo.alternates
# empty alternates
self.rorepo.alternates = []
assert self.rorepo.alternates == []
- alts = [ "other/location", "this/location" ]
+ alts = ["other/location", "this/location"]
self.rorepo.alternates = alts
assert alts == self.rorepo.alternates
self.rorepo.alternates = cur_alternates
@@ -224,13 +223,13 @@ class RepoBase(TestDBBase):
orig_value = self.rorepo._bare
self.rorepo._bare = True
assert_false(self.rorepo.is_dirty())
- self.rorepo._bare = orig_value
+ self.rorepo._bare = orig_value
def test_is_dirty(self):
self.rorepo._bare = False
- for index in (0,1):
- for working_tree in (0,1):
- for untracked_files in (0,1):
+ for index in (0, 1):
+ for working_tree in (0, 1):
+ for untracked_files in (0, 1):
assert self.rorepo.is_dirty(index, working_tree, untracked_files) in (True, False)
# END untracked files
# END working tree
@@ -246,28 +245,28 @@ class RepoBase(TestDBBase):
def test_index(self):
index = self.rorepo.index
assert isinstance(index, IndexFile)
-
+
def test_tag(self):
assert self.rorepo.tag('0.1.5').commit
assert self.rorepo.tag('refs/tags/0.1.5').commit
-
+
def test_archive(self):
tmpfile = os.tmpfile()
self.rorepo.archive(tmpfile, '0.1.5')
assert tmpfile.tell()
-
+
@patch.object(Git, '_call_process')
def test_should_display_blame_information(self, git):
git.return_value = fixture('blame')
- b = self.rorepo.blame( 'master', 'lib/git.py')
+ b = self.rorepo.blame('master', 'lib/git.py')
assert_equal(13, len(b))
- assert_equal( 2, len(b[0]) )
+ assert_equal(2, len(b[0]))
# assert_equal(25, reduce(lambda acc, x: acc + len(x[-1]), b))
assert_equal(hash(b[0][0]), hash(b[9][0]))
c = b[0][0]
assert_true(git.called)
assert_equal(git.call_args, (('blame', 'master', '--', 'lib/git.py'), {'p': True}))
-
+
assert_equal('634396b2f541a9f2d58b00be1a07f0c358b999b3', c.hexsha)
assert_equal('Tom Preston-Werner', c.author.name)
assert_equal('tom@mojombo.com', c.author.email)
@@ -276,35 +275,35 @@ class RepoBase(TestDBBase):
assert_equal('tom@mojombo.com', c.committer.email)
assert_equal(1191997100, c.committed_date)
assert_equal('initial grit setup', c.message)
-
+
# test the 'lines per commit' entries
tlist = b[0][1]
- assert_true( tlist )
- assert_true( isinstance( tlist[0], basestring ) )
- assert_true( len( tlist ) < sum( len(t) for t in tlist ) ) # test for single-char bug
-
+ assert_true(tlist)
+ assert_true(isinstance(tlist[0], basestring))
+ assert_true(len(tlist) < sum(len(t) for t in tlist)) # test for single-char bug
+
def test_blame_real(self):
c = 0
for item in self.rorepo.head.commit.tree.traverse(
- predicate=lambda i, d: i.type == 'blob' and i.path.endswith('.py')):
+ predicate=lambda i, d: i.type == 'blob' and i.path.endswith('.py')):
c += 1
b = self.rorepo.blame(self.rorepo.head, item.path)
- #END for each item to traverse
+ # END for each item to traverse
assert c
-
+
def test_untracked_files(self):
base = self.rorepo.working_tree_dir
- files = ( join_path_native(base, "__test_myfile"),
- join_path_native(base, "__test_other_file") )
+ files = (join_path_native(base, "__test_myfile"),
+ join_path_native(base, "__test_other_file"))
num_recently_untracked = 0
try:
for fpath in files:
- fd = open(fpath,"wb")
+ fd = open(fpath, "wb")
fd.close()
# END for each filename
untracked_files = self.rorepo.untracked_files
num_recently_untracked = len(untracked_files)
-
+
# assure we have all names - they are relative to the git-dir
num_test_untracked = 0
for utfile in untracked_files:
@@ -314,80 +313,81 @@ class RepoBase(TestDBBase):
for fpath in files:
if os.path.isfile(fpath):
os.remove(fpath)
- # END handle files
-
+ # END handle files
+
assert len(self.rorepo.untracked_files) == (num_recently_untracked - len(files))
-
+
def test_config_reader(self):
- reader = self.rorepo.config_reader() # all config files
+ reader = self.rorepo.config_reader() # all config files
assert reader.read_only
reader = self.rorepo.config_reader("repository") # single config file
assert reader.read_only
-
+
def test_config_writer(self):
for config_level in self.rorepo.config_level:
try:
writer = self.rorepo.config_writer(config_level)
assert not writer.read_only
except IOError:
- # its okay not to get a writer for some configuration files if we
+ # its okay not to get a writer for some configuration files if we
# have no permissions
- pass
- # END for each config level
-
+ pass
+ # END for each config level
+
def test_creation_deletion(self):
- # just a very quick test to assure it generally works. There are
+ # just a very quick test to assure it generally works. There are
# specialized cases in the test_refs module
head = self.rorepo.create_head("new_head", "HEAD~1")
self.rorepo.delete_head(head)
-
+
tag = self.rorepo.create_tag("new_tag", "HEAD~2")
self.rorepo.delete_tag(tag)
self.rorepo.config_writer()
remote = self.rorepo.create_remote("new_remote", "git@server:repo.git")
self.rorepo.delete_remote(remote)
-
+
def test_comparison_and_hash(self):
# this is only a preliminary test, more testing done in test_index
assert self.rorepo == self.rorepo and not (self.rorepo != self.rorepo)
assert len(set((self.rorepo, self.rorepo))) == 1
-
+
def test_git_cmd(self):
# test CatFileContentStream, just to be very sure we have no fencepost errors
# last \n is the terminating newline that it expects
l1 = "0123456789\n"
l2 = "abcdefghijklmnopqrstxy\n"
- l3 = "z\n"
+ l3 = "z\n"
d = "%s%s%s\n" % (l1, l2, l3)
-
+
l1p = l1[:5]
-
+
# full size
# size is without terminating newline
def mkfull():
- return Git.CatFileContentStream(len(d)-1, StringIO(d))
-
+ return Git.CatFileContentStream(len(d) - 1, StringIO(d))
+
ts = 5
+
def mktiny():
return Git.CatFileContentStream(ts, StringIO(d))
-
+
# readlines no limit
s = mkfull()
lines = s.readlines()
assert len(lines) == 3 and lines[-1].endswith('\n')
assert s._stream.tell() == len(d) # must have scrubbed to the end
-
+
# realines line limit
s = mkfull()
lines = s.readlines(5)
assert len(lines) == 1
-
+
# readlines on tiny sections
s = mktiny()
lines = s.readlines()
assert len(lines) == 1 and lines[0] == l1p
- assert s._stream.tell() == ts+1
-
+ assert s._stream.tell() == ts + 1
+
# readline no limit
s = mkfull()
assert s.readline() == l1
@@ -395,52 +395,51 @@ class RepoBase(TestDBBase):
assert s.readline() == l3
assert s.readline() == ''
assert s._stream.tell() == len(d)
-
+
# readline limit
s = mkfull()
assert s.readline(5) == l1p
assert s.readline() == l1[5:]
-
+
# readline on tiny section
s = mktiny()
assert s.readline() == l1p
assert s.readline() == ''
- assert s._stream.tell() == ts+1
-
+ assert s._stream.tell() == ts + 1
+
# read no limit
s = mkfull()
assert s.read() == d[:-1]
assert s.read() == ''
assert s._stream.tell() == len(d)
-
+
# read limit
s = mkfull()
assert s.read(5) == l1p
assert s.read(6) == l1[5:]
assert s._stream.tell() == 5 + 6 # its not yet done
-
+
# read tiny
s = mktiny()
assert s.read(2) == l1[:2]
assert s._stream.tell() == 2
assert s.read() == l1[2:ts]
- assert s._stream.tell() == ts+1
-
+ assert s._stream.tell() == ts + 1
+
def _assert_rev_parse_types(self, name, rev_obj):
rev_parse = self.rorepo.rev_parse
-
+
if rev_obj.type == 'tag':
rev_obj = rev_obj.object
-
+
# tree and blob type
obj = rev_parse(name + '^{tree}')
assert obj == rev_obj.tree
-
+
obj = rev_parse(name + ':CHANGES')
assert obj.type == 'blob' and obj.path == 'CHANGES'
assert rev_obj.tree['CHANGES'] == obj
-
-
+
def _assert_rev_parse(self, name):
"""tries multiple different rev-parse syntaxes with the given name
:return: parsed object"""
@@ -450,62 +449,62 @@ class RepoBase(TestDBBase):
obj = orig_obj.object
else:
obj = orig_obj
- # END deref tags by default
-
+ # END deref tags by default
+
# try history
rev = name + "~"
obj2 = rev_parse(rev)
assert obj2 == obj.parents[0]
self._assert_rev_parse_types(rev, obj2)
-
+
# history with number
ni = 11
history = [obj.parents[0]]
for pn in range(ni):
history.append(history[-1].parents[0])
# END get given amount of commits
-
+
for pn in range(11):
- rev = name + "~%i" % (pn+1)
+ rev = name + "~%i" % (pn + 1)
obj2 = rev_parse(rev)
assert obj2 == history[pn]
self._assert_rev_parse_types(rev, obj2)
# END history check
-
+
# parent ( default )
rev = name + "^"
obj2 = rev_parse(rev)
assert obj2 == obj.parents[0]
self._assert_rev_parse_types(rev, obj2)
-
+
# parent with number
for pn, parent in enumerate(obj.parents):
- rev = name + "^%i" % (pn+1)
+ rev = name + "^%i" % (pn + 1)
assert rev_parse(rev) == parent
self._assert_rev_parse_types(rev, parent)
# END for each parent
-
+
return orig_obj
-
+
@with_rw_repo('HEAD', bare=False)
def test_rw_rev_parse(self, rwrepo):
# verify it does not confuse branches with hexsha ids
ahead = rwrepo.create_head('aaaaaaaa')
assert(rwrepo.rev_parse(str(ahead)) == ahead.commit)
-
+
def test_rev_parse(self):
rev_parse = self.rorepo.rev_parse
-
+
# try special case: This one failed at some point, make sure its fixed
assert rev_parse("33ebe").hexsha == "33ebe7acec14b25c5f84f35a664803fcab2f7781"
-
+
# start from reference
num_resolved = 0
-
+
for ref in Reference.iter_items(self.rorepo):
path_tokens = ref.path.split("/")
for pt in range(len(path_tokens)):
- path_section = '/'.join(path_tokens[-(pt+1):])
+ path_section = '/'.join(path_tokens[-(pt + 1):])
try:
obj = self._assert_rev_parse(path_section)
assert obj.type == ref.object.type
@@ -518,106 +517,102 @@ class RepoBase(TestDBBase):
# END for each token
# END for each reference
assert num_resolved
-
+
# it works with tags !
tag = self._assert_rev_parse('0.1.4')
assert tag.type == 'tag'
-
+
# try full sha directly ( including type conversion )
assert tag.object == rev_parse(tag.object.hexsha)
self._assert_rev_parse_types(tag.object.hexsha, tag.object)
-
-
+
# multiple tree types result in the same tree: HEAD^{tree}^{tree}:CHANGES
rev = '0.1.4^{tree}^{tree}'
assert rev_parse(rev) == tag.object.tree
- assert rev_parse(rev+':CHANGES') == tag.object.tree['CHANGES']
-
-
+ assert rev_parse(rev + ':CHANGES') == tag.object.tree['CHANGES']
+
# try to get parents from first revision - it should fail as no such revision
# exists
first_rev = "33ebe7acec14b25c5f84f35a664803fcab2f7781"
commit = rev_parse(first_rev)
assert len(commit.parents) == 0
assert commit.hexsha == first_rev
- self.failUnlessRaises(BadObject, rev_parse, first_rev+"~")
- self.failUnlessRaises(BadObject, rev_parse, first_rev+"^")
-
+ self.failUnlessRaises(BadObject, rev_parse, first_rev + "~")
+ self.failUnlessRaises(BadObject, rev_parse, first_rev + "^")
+
# short SHA1
commit2 = rev_parse(first_rev[:20])
assert commit2 == commit
commit2 = rev_parse(first_rev[:5])
assert commit2 == commit
-
-
+
# todo: dereference tag into a blob 0.1.7^{blob} - quite a special one
# needs a tag which points to a blob
-
-
+
# ref^0 returns commit being pointed to, same with ref~0, and ^{}
tag = rev_parse('0.1.4')
for token in (('~0', '^0', '^{}')):
assert tag.object == rev_parse('0.1.4%s' % token)
# END handle multiple tokens
-
+
# try partial parsing
max_items = 40
for i, binsha in enumerate(self.rorepo.odb.sha_iter()):
- assert rev_parse(bin_to_hex(binsha)[:8-(i%2)]).binsha == binsha
+ assert rev_parse(bin_to_hex(binsha)[:8 - (i % 2)]).binsha == binsha
if i > max_items:
# this is rather slow currently, as rev_parse returns an object
# which requires accessing packs, it has some additional overhead
break
# END for each binsha in repo
-
+
# missing closing brace commit^{tree
self.failUnlessRaises(ValueError, rev_parse, '0.1.4^{tree')
-
+
# missing starting brace
self.failUnlessRaises(ValueError, rev_parse, '0.1.4^tree}')
-
+
# REVLOG
#######
head = self.rorepo.head
-
+
# need to specify a ref when using the @ syntax
self.failUnlessRaises(BadObject, rev_parse, "%s@{0}" % head.commit.hexsha)
-
+
# uses HEAD.ref by default
assert rev_parse('@{0}') == head.commit
if not head.is_detached:
refspec = '%s@{0}' % head.ref.name
assert rev_parse(refspec) == head.ref.commit
# all additional specs work as well
- assert rev_parse(refspec+"^{tree}") == head.commit.tree
- assert rev_parse(refspec+":CHANGES").type == 'blob'
- #END operate on non-detached head
-
+ assert rev_parse(refspec + "^{tree}") == head.commit.tree
+ assert rev_parse(refspec + ":CHANGES").type == 'blob'
+ # END operate on non-detached head
+
# the most recent previous position of the currently checked out branch
-
+
try:
assert rev_parse('@{1}') != head.commit
except IndexError:
# on new checkouts, there isn't even a single past branch position
# in the log
pass
- #END handle fresh checkouts
-
+ # END handle fresh checkouts
+
# position doesn't exist
self.failUnlessRaises(IndexError, rev_parse, '@{10000}')
-
+
# currently, nothing more is supported
self.failUnlessRaises(NotImplementedError, rev_parse, "@{1 week ago}")
-
+
def test_submodules(self):
assert len(self.rorepo.submodules) == 2 # non-recursive
# in previous configurations, we had recursive repositories so this would compare to 2
# now there is only one left, as gitdb was merged, but we have smmap instead
assert len(list(self.rorepo.iter_submodules())) == 2
-
+
assert isinstance(self.rorepo.submodule("async"), Submodule)
self.failUnlessRaises(ValueError, self.rorepo.submodule, "doesn't exist")
-
+
@with_rw_repo('HEAD', bare=False)
def test_submodule_update(self, rwrepo):
# fails in bare mode
@@ -629,13 +624,11 @@ class RepoBase(TestDBBase):
rwrepo._bare = False
if rwrepo.bare:
rwrepo.bare = False
- #END special repo handling
-
+ # END special repo handling
+
# test create submodule
sm = rwrepo.submodules[0]
sm = rwrepo.create_submodule("my_new_sub", "some_path", join_path_native(self.rorepo.working_tree_dir, sm.path))
assert isinstance(sm, Submodule)
-
+
# note: the rest of this functionality is tested in test_submodule
-
-
diff --git a/git/test/db/cmd/test_base.py b/git/test/db/cmd/test_base.py
index 890c0232..9eee7223 100644
--- a/git/test/db/cmd/test_base.py
+++ b/git/test/db/cmd/test_base.py
@@ -13,79 +13,77 @@ from git.db.cmd.base import *
from git.refs import TagReference, Reference, RemoteReference
+
class TestBase(RepoBase):
RepoCls = CmdCompatibilityGitDB
def test_basics(self):
gdb = self.rorepo
-
+
# partial to complete - works with everything
hexsha = bin_to_hex(gdb.partial_to_complete_sha_hex("0.1.6"))
assert len(hexsha) == 40
-
+
assert bin_to_hex(gdb.partial_to_complete_sha_hex(hexsha[:20])) == hexsha
-
+
# fails with BadObject
for invalid_rev in ("0000", "bad/ref", "super bad"):
self.failUnlessRaises(BadObject, gdb.partial_to_complete_sha_hex, invalid_rev)
-
+
def test_fetch_info(self):
self.failUnlessRaises(ValueError, CmdCmdFetchInfo._from_line, self.rorepo, "nonsense", '')
- self.failUnlessRaises(ValueError, CmdCmdFetchInfo._from_line, self.rorepo, "? [up to date] 0.1.7RC -> origin/0.1.7RC", '')
-
-
+ self.failUnlessRaises(ValueError, CmdCmdFetchInfo._from_line, self.rorepo,
+ "? [up to date] 0.1.7RC -> origin/0.1.7RC", '')
+
def test_fetch_info(self):
# assure we can handle remote-tracking branches
fetch_info_line_fmt = "c437ee5deb8d00cf02f03720693e4c802e99f390 not-for-merge %s '0.3' of git://github.com/gitpython-developers/GitPython"
remote_info_line_fmt = "* [new branch] nomatter -> %s"
fi = CmdFetchInfo._from_line(self.rorepo,
- remote_info_line_fmt % "local/master",
- fetch_info_line_fmt % 'remote-tracking branch')
-
+ remote_info_line_fmt % "local/master",
+ fetch_info_line_fmt % 'remote-tracking branch')
+
# we wouldn't be here if it wouldn't have worked
-
+
# handles non-default refspecs: One can specify a different path in refs/remotes
# or a special path just in refs/something for instance
-
+
fi = CmdFetchInfo._from_line(self.rorepo,
- remote_info_line_fmt % "subdir/tagname",
- fetch_info_line_fmt % 'tag')
-
+ remote_info_line_fmt % "subdir/tagname",
+ fetch_info_line_fmt % 'tag')
+
assert isinstance(fi.ref, TagReference)
assert fi.ref.path.startswith('refs/tags')
-
+
# it could be in a remote direcftory though
fi = CmdFetchInfo._from_line(self.rorepo,
- remote_info_line_fmt % "remotename/tags/tagname",
- fetch_info_line_fmt % 'tag')
-
+ remote_info_line_fmt % "remotename/tags/tagname",
+ fetch_info_line_fmt % 'tag')
+
assert isinstance(fi.ref, TagReference)
assert fi.ref.path.startswith('refs/remotes/')
-
+
# it can also be anywhere !
tag_path = "refs/something/remotename/tags/tagname"
fi = CmdFetchInfo._from_line(self.rorepo,
- remote_info_line_fmt % tag_path,
- fetch_info_line_fmt % 'tag')
-
+ remote_info_line_fmt % tag_path,
+ fetch_info_line_fmt % 'tag')
+
assert isinstance(fi.ref, TagReference)
assert fi.ref.path == tag_path
-
+
# branches default to refs/remotes
fi = CmdFetchInfo._from_line(self.rorepo,
- remote_info_line_fmt % "remotename/branch",
- fetch_info_line_fmt % 'branch')
-
+ remote_info_line_fmt % "remotename/branch",
+ fetch_info_line_fmt % 'branch')
+
assert isinstance(fi.ref, RemoteReference)
assert fi.ref.remote_name == 'remotename'
-
+
# but you can force it anywhere, in which case we only have a references
fi = CmdFetchInfo._from_line(self.rorepo,
- remote_info_line_fmt % "refs/something/branch",
- fetch_info_line_fmt % 'branch')
-
+ remote_info_line_fmt % "refs/something/branch",
+ fetch_info_line_fmt % 'branch')
+
assert type(fi.ref) is Reference
assert fi.ref.path == "refs/something/branch"
-
-
-
diff --git a/git/test/db/dulwich/lib.py b/git/test/db/dulwich/lib.py
index a58469f1..bd6a0564 100644
--- a/git/test/db/dulwich/lib.py
+++ b/git/test/db/dulwich/lib.py
@@ -1,14 +1,15 @@
"""dulwich specific utilities, as well as all the default ones"""
from git.test.lib import (
- InheritedTestMethodsOverrideWrapperMetaClsAutoMixin,
- needs_module_or_skip
- )
+ InheritedTestMethodsOverrideWrapperMetaClsAutoMixin,
+ needs_module_or_skip
+)
__all__ = ['needs_dulwich_or_skip', 'DulwichRequiredMetaMixin']
#{ Decoorators
+
def needs_dulwich_or_skip(func):
"""Skip this test if we have no dulwich - print warning"""
return needs_module_or_skip('dulwich')(func)
@@ -17,6 +18,7 @@ def needs_dulwich_or_skip(func):
#{ MetaClasses
+
class DulwichRequiredMetaMixin(InheritedTestMethodsOverrideWrapperMetaClsAutoMixin):
decorator = [needs_dulwich_or_skip]
diff --git a/git/test/db/dulwich/test_base.py b/git/test/db/dulwich/test_base.py
index ed2f8975..82713103 100644
--- a/git/test/db/dulwich/test_base.py
+++ b/git/test/db/dulwich/test_base.py
@@ -7,7 +7,6 @@ from git.test.lib import TestBase, with_rw_repo
from git.test.db.base import RepoBase
-
try:
import dulwich
except ImportError:
@@ -17,16 +16,15 @@ except ImportError:
else:
# now we know dulwich is available, to do futher imports
from git.db.dulwich.complex import DulwichCompatibilityGitDB as DulwichDB
-
-#END handle imports
+
+# END handle imports
+
class TestDulwichDBBase(RepoBase):
__metaclass__ = DulwichRequiredMetaMixin
RepoCls = DulwichDB
-
+
@needs_dulwich_or_skip
@with_rw_repo('HEAD', bare=False)
def test_basics(self, rw_repo):
db = DulwichDB(rw_repo.working_tree_dir)
-
-
diff --git a/git/test/db/lib.py b/git/test/db/lib.py
index d406382a..74a6509e 100644
--- a/git/test/db/lib.py
+++ b/git/test/db/lib.py
@@ -10,15 +10,15 @@ from git.test.lib import (
fixture_path,
TestBase,
rorepo_dir,
- )
+)
from git.stream import Sha1Writer
from git.base import (
- IStream,
- OStream,
- OInfo
- )
-
+ IStream,
+ OStream,
+ OInfo
+)
+
from git.exc import BadObject
from git.typ import str_blob_type
@@ -28,41 +28,43 @@ from struct import pack
__all__ = ('TestDBBase', 'with_rw_directory', 'with_packs_rw', 'fixture_path')
-
+
+
class TestDBBase(TestBase):
+
"""Base Class providing default functionality to all tests such as:
-
+
- Utility functions provided by the TestCase base of the unittest method such as::
self.fail("todo")
self.failUnlessRaises(...)
-
+
- Class level repository which is considered read-only as it is shared among
all test cases in your type.
Access it using::
self.rorepo # 'ro' stands for read-only
-
+
The rorepo is in fact your current project's git repo. If you refer to specific
shas for your objects, be sure you choose some that are part of the immutable portion
of the project history ( to assure tests don't fail for others ).
-
+
Derived types can override the default repository type to create a different
read-only repo, allowing to test their specific type
"""
-
+
# data
two_lines = "1234\nhello world"
all_data = (two_lines, )
-
+
#{ Configuration
# The repository type to instantiate. It takes at least a path to operate upon
# during instantiation.
RepoCls = None
-
+
# if True, a read-only repo will be provided and RepoCls must be set.
# Otherwise it may remain unset
needs_ro_repo = True
#} END configuration
-
+
@classmethod
def setUp(cls):
"""
@@ -73,8 +75,8 @@ class TestDBBase(TestBase):
if cls is not TestDBBase:
assert cls.RepoCls is not None, "RepoCls class member must be set in %s" % cls
cls.rorepo = cls.RepoCls(rorepo_dir())
- #END handle rorepo
-
+ # END handle rorepo
+
def _assert_object_writing_simple(self, db):
# write a bunch of objects and query their streams and info
null_objs = db.size()
@@ -85,23 +87,22 @@ class TestDBBase(TestBase):
new_istream = db.store(istream)
assert new_istream is istream
assert db.has_object(istream.binsha)
-
+
info = db.info(istream.binsha)
assert isinstance(info, OInfo)
assert info.type == istream.type and info.size == istream.size
-
+
stream = db.stream(istream.binsha)
assert isinstance(stream, OStream)
assert stream.binsha == info.binsha and stream.type == info.type
assert stream.read() == data
# END for each item
-
+
assert db.size() == null_objs + ni
shas = list(db.sha_iter())
assert len(shas) == db.size()
assert len(shas[0]) == 20
-
-
+
def _assert_object_writing(self, db):
"""General tests to verify object writing, compatible to ObjectDBW
:note: requires write access to the database"""
@@ -115,25 +116,25 @@ class TestDBBase(TestBase):
ostream = ostreamcls()
assert isinstance(ostream, Sha1Writer)
# END create ostream
-
+
prev_ostream = db.set_ostream(ostream)
- assert type(prev_ostream) in ostreams or prev_ostream in ostreams
-
+ assert type(prev_ostream) in ostreams or prev_ostream in ostreams
+
istream = IStream(str_blob_type, len(data), StringIO(data))
-
+
# store returns same istream instance, with new sha set
my_istream = db.store(istream)
sha = istream.binsha
assert my_istream is istream
assert db.has_object(sha) != dry_run
- assert len(sha) == 20
-
+ assert len(sha) == 20
+
# verify data - the slow way, we want to run code
if not dry_run:
info = db.info(sha)
assert str_blob_type == info.type
assert info.size == len(data)
-
+
ostream = db.stream(sha)
assert ostream.read() == data
assert ostream.type == str_blob_type
@@ -141,57 +142,58 @@ class TestDBBase(TestBase):
else:
self.failUnlessRaises(BadObject, db.info, sha)
self.failUnlessRaises(BadObject, db.stream, sha)
-
+
# DIRECT STREAM COPY
# our data hase been written in object format to the StringIO
# we pasesd as output stream. No physical database representation
# was created.
- # Test direct stream copy of object streams, the result must be
+ # Test direct stream copy of object streams, the result must be
# identical to what we fed in
ostream.seek(0)
istream.stream = ostream
assert istream.binsha is not None
prev_sha = istream.binsha
-
+
db.set_ostream(ZippedStoreShaWriter())
db.store(istream)
assert istream.binsha == prev_sha
new_ostream = db.ostream()
-
+
# note: only works as long our store write uses the same compression
# level, which is zip_best
assert ostream.getvalue() == new_ostream.getvalue()
# END for each data set
# END for each dry_run mode
-
+
def _assert_object_writing_async(self, db):
"""Test generic object writing using asynchronous access"""
ni = 5000
+
def istream_generator(offset=0, ni=ni):
for data_src in xrange(ni):
data = str(data_src + offset)
yield IStream(str_blob_type, len(data), StringIO(data))
# END for each item
# END generator utility
-
+
# for now, we are very trusty here as we expect it to work if it worked
# in the single-stream case
-
+
# write objects
reader = IteratorReader(istream_generator())
istream_reader = db.store_async(reader)
istreams = istream_reader.read() # read all
assert istream_reader.task().error() is None
assert len(istreams) == ni
-
+
for stream in istreams:
assert stream.error is None
assert len(stream.binsha) == 20
assert isinstance(stream, IStream)
# END assert each stream
-
+
# test has-object-async - we must have all previously added ones
- reader = IteratorReader( istream.binsha for istream in istreams )
+ reader = IteratorReader(istream.binsha for istream in istreams)
hasobject_reader = db.has_object_async(reader)
count = 0
for sha, has_object in hasobject_reader:
@@ -199,11 +201,11 @@ class TestDBBase(TestBase):
count += 1
# END for each sha
assert count == ni
-
+
# read the objects we have just written
- reader = IteratorReader( istream.binsha for istream in istreams )
+ reader = IteratorReader(istream.binsha for istream in istreams)
ostream_reader = db.stream_async(reader)
-
+
# read items individually to prevent hitting possible sys-limits
count = 0
for ostream in ostream_reader:
@@ -212,30 +214,29 @@ class TestDBBase(TestBase):
# END for each ostream
assert ostream_reader.task().error() is None
assert count == ni
-
+
# get info about our items
- reader = IteratorReader( istream.binsha for istream in istreams )
+ reader = IteratorReader(istream.binsha for istream in istreams)
info_reader = db.info_async(reader)
-
+
count = 0
for oinfo in info_reader:
assert isinstance(oinfo, OInfo)
count += 1
# END for each oinfo instance
assert count == ni
-
-
+
# combined read-write using a converter
# add 2500 items, and obtain their output streams
nni = 2500
reader = IteratorReader(istream_generator(offset=ni, ni=nni))
- istream_to_sha = lambda istreams: [ istream.binsha for istream in istreams ]
-
+ istream_to_sha = lambda istreams: [istream.binsha for istream in istreams]
+
istream_reader = db.store_async(reader)
istream_reader.set_post_cb(istream_to_sha)
-
+
ostream_reader = db.stream_async(istream_reader)
-
+
count = 0
# read it individually, otherwise we might run into the ulimit
for ostream in ostream_reader:
@@ -243,5 +244,3 @@ class TestDBBase(TestBase):
count += 1
# END for each ostream
assert count == nni
-
-
diff --git a/git/test/db/py/test_base.py b/git/test/db/py/test_base.py
index 5d076bb2..cd1bed0f 100644
--- a/git/test/db/py/test_base.py
+++ b/git/test/db/py/test_base.py
@@ -7,10 +7,10 @@ from git.test.db.base import RepoBase
from git.db.complex import PureCompatibilityGitDB
+
class TestPyDBBase(RepoBase):
-
+
RepoCls = PureCompatibilityGitDB
-
+
def test_basics(self):
pass
-
diff --git a/git/test/db/py/test_git.py b/git/test/db/py/test_git.py
index 4f5b5fb5..207d2864 100644
--- a/git/test/db/py/test_git.py
+++ b/git/test/db/py/test_git.py
@@ -11,15 +11,16 @@ from git.util import hex_to_bin, bin_to_hex
import os
+
class TestGitDB(TestDBBase):
needs_ro_repo = False
-
+
def test_reading(self):
gdb = PureGitODB(os.path.join(rorepo_dir(), 'objects'))
-
+
# we have packs and loose objects, alternates doesn't necessarily exist
assert 1 < len(gdb.databases()) < 4
-
+
# access should be possible
git_sha = hex_to_bin("5aebcd5cb3340fb31776941d7e4d518a712a8655")
assert isinstance(gdb.info(git_sha), OInfo)
@@ -27,25 +28,24 @@ class TestGitDB(TestDBBase):
assert gdb.size() > 200
sha_list = list(gdb.sha_iter())
assert len(sha_list) == gdb.size()
-
-
- # This is actually a test for compound functionality, but it doesn't
+
+ # This is actually a test for compound functionality, but it doesn't
# have a separate test module
# test partial shas
# this one as uneven and quite short
assert gdb.partial_to_complete_sha_hex('5aebcd') == hex_to_bin("5aebcd5cb3340fb31776941d7e4d518a712a8655")
-
+
# mix even/uneven hexshas
for i, binsha in enumerate(sha_list[:50]):
- assert gdb.partial_to_complete_sha_hex(bin_to_hex(binsha)[:8-(i%2)]) == binsha
+ assert gdb.partial_to_complete_sha_hex(bin_to_hex(binsha)[:8 - (i % 2)]) == binsha
# END for each sha
-
+
self.failUnlessRaises(BadObject, gdb.partial_to_complete_sha_hex, "0000")
-
+
@with_rw_directory
def test_writing(self, path):
gdb = PureGitODB(path)
-
+
# its possible to write objects
self._assert_object_writing(gdb)
self._assert_object_writing_async(gdb)
diff --git a/git/test/db/py/test_loose.py b/git/test/db/py/test_loose.py
index cfb0ca3a..b3ffb64f 100644
--- a/git/test/db/py/test_loose.py
+++ b/git/test/db/py/test_loose.py
@@ -6,31 +6,31 @@ from git.test.db.lib import TestDBBase, with_rw_directory
from git.db.py.loose import PureLooseObjectODB
from git.exc import BadObject
from git.util import bin_to_hex
-
+
+
class TestLooseDB(TestDBBase):
-
+
needs_ro_repo = False
-
+
@with_rw_directory
def test_basics(self, path):
ldb = PureLooseObjectODB(path)
-
+
# write data
self._assert_object_writing(ldb)
self._assert_object_writing_async(ldb)
-
+
# verify sha iteration and size
shas = list(ldb.sha_iter())
assert shas and len(shas[0]) == 20
-
+
assert len(shas) == ldb.size()
-
+
# verify find short object
long_sha = bin_to_hex(shas[-1])
for short_sha in (long_sha[:20], long_sha[:5]):
assert bin_to_hex(ldb.partial_to_complete_sha_hex(short_sha)) == long_sha
# END for each sha
-
+
self.failUnlessRaises(BadObject, ldb.partial_to_complete_sha_hex, '0000')
# raises if no object could be foudn
-
diff --git a/git/test/db/py/test_mem.py b/git/test/db/py/test_mem.py
index bb879554..0468b8af 100644
--- a/git/test/db/py/test_mem.py
+++ b/git/test/db/py/test_mem.py
@@ -5,26 +5,27 @@
from git.test.db.lib import TestDBBase, with_rw_directory
from git.db.py.mem import PureMemoryDB
from git.db.py.loose import PureLooseObjectODB
-
+
+
class TestPureMemoryDB(TestDBBase):
-
+
needs_ro_repo = False
@with_rw_directory
def test_writing(self, path):
mdb = PureMemoryDB()
-
+
# write data
self._assert_object_writing_simple(mdb)
-
+
# test stream copy
ldb = PureLooseObjectODB(path)
assert ldb.size() == 0
num_streams_copied = mdb.stream_copy(mdb.sha_iter(), ldb)
assert num_streams_copied == mdb.size()
-
+
assert ldb.size() == mdb.size()
for sha in mdb.sha_iter():
assert ldb.has_object(sha)
- assert ldb.stream(sha).read() == mdb.stream(sha).read()
+ assert ldb.stream(sha).read() == mdb.stream(sha).read()
# END verify objects where copied and are equal
diff --git a/git/test/db/py/test_pack.py b/git/test/db/py/test_pack.py
index 54dc2e2c..2cb7ea70 100644
--- a/git/test/db/py/test_pack.py
+++ b/git/test/db/py/test_pack.py
@@ -12,48 +12,48 @@ from git.exc import BadObject, AmbiguousObjectName
import os
import random
+
class TestPackDB(TestDBBase):
-
- needs_ro_repo = False
-
+
+ needs_ro_repo = False
+
@with_packs_rw
def test_writing(self, path):
pdb = PurePackedODB(path)
-
+
# on demand, we init our pack cache
num_packs = len(pdb.entities())
assert num_packs
assert pdb._st_mtime != 0
-
- # test pack directory changed:
+
+ # test pack directory changed:
# packs removed - rename a file, should affect the glob
pack_path = pdb.entities()[0].pack().path()
new_pack_path = pack_path + "renamed"
os.rename(pack_path, new_pack_path)
-
+
pdb.update_cache(force=True)
assert len(pdb.entities()) == num_packs - 1
-
+
# packs added
os.rename(new_pack_path, pack_path)
pdb.update_cache(force=True)
assert len(pdb.entities()) == num_packs
-
+
# bang on the cache
# access the Entities directly, as there is no iteration interface
# yet ( or required for now )
sha_list = list(pdb.sha_iter())
assert len(sha_list) == pdb.size()
-
+
# hit all packs in random order
random.shuffle(sha_list)
-
+
for sha in sha_list:
info = pdb.info(sha)
stream = pdb.stream(sha)
# END for each sha to query
-
-
+
# test short finding - be a bit more brutal here
max_bytes = 19
min_bytes = 2
@@ -61,16 +61,16 @@ class TestPackDB(TestDBBase):
for i, sha in enumerate(sha_list):
short_sha = sha[:max((i % max_bytes), min_bytes)]
try:
- assert pdb.partial_to_complete_sha(short_sha, len(short_sha)*2) == sha
+ assert pdb.partial_to_complete_sha(short_sha, len(short_sha) * 2) == sha
except AmbiguousObjectName:
num_ambiguous += 1
- pass # valid, we can have short objects
+ pass # valid, we can have short objects
# END exception handling
# END for each sha to find
-
+
# we should have at least one ambiguous, considering the small sizes
- # but in our pack, there is no ambigious ...
+ # but in our pack, there is no ambigious ...
# assert num_ambiguous
-
+
# non-existing
self.failUnlessRaises(BadObject, pdb.partial_to_complete_sha, "\0\0", 4)
diff --git a/git/test/db/py/test_ref.py b/git/test/db/py/test_ref.py
index dfaf9644..4b5dd134 100644
--- a/git/test/db/py/test_ref.py
+++ b/git/test/db/py/test_ref.py
@@ -6,16 +6,17 @@ from git.test.db.lib import *
from git.db.py.ref import PureReferenceDB
from git.util import (
- NULL_BIN_SHA,
- hex_to_bin
- )
+ NULL_BIN_SHA,
+ hex_to_bin
+)
import os
-
+
+
class TestPureReferenceDB(TestDBBase):
-
+
needs_ro_repo = False
-
+
def make_alt_file(self, alt_path, alt_list):
"""Create an alternates file which contains the given alternates.
The list can be empty"""
@@ -23,40 +24,37 @@ class TestPureReferenceDB(TestDBBase):
for alt in alt_list:
alt_file.write(alt + "\n")
alt_file.close()
-
+
@with_rw_directory
def test_writing(self, path):
- NULL_BIN_SHA = '\0' * 20
-
+ NULL_BIN_SHA = '\0' * 20
+
alt_path = os.path.join(path, 'alternates')
rdb = PureReferenceDB(alt_path)
assert len(rdb.databases()) == 0
assert rdb.size() == 0
assert len(list(rdb.sha_iter())) == 0
-
+
# try empty, non-existing
assert not rdb.has_object(NULL_BIN_SHA)
-
-
+
# setup alternate file
# add two, one is invalid
own_repo_path = fixture_path('../../../.git/objects') # use own repo
self.make_alt_file(alt_path, [own_repo_path, "invalid/path"])
rdb.update_cache()
assert len(rdb.databases()) == 1
-
+
# we should now find a default revision of ours
git_sha = hex_to_bin("5aebcd5cb3340fb31776941d7e4d518a712a8655")
assert rdb.has_object(git_sha)
-
+
# remove valid
self.make_alt_file(alt_path, ["just/one/invalid/path"])
rdb.update_cache()
assert len(rdb.databases()) == 0
-
+
# add valid
self.make_alt_file(alt_path, [own_repo_path])
rdb.update_cache()
assert len(rdb.databases()) == 1
-
-
diff --git a/git/test/db/pygit2/lib.py b/git/test/db/pygit2/lib.py
index fab762e7..76441333 100644
--- a/git/test/db/pygit2/lib.py
+++ b/git/test/db/pygit2/lib.py
@@ -1,14 +1,15 @@
"""pygit2 specific utilities, as well as all the default ones"""
from git.test.lib import (
- InheritedTestMethodsOverrideWrapperMetaClsAutoMixin,
- needs_module_or_skip
- )
+ InheritedTestMethodsOverrideWrapperMetaClsAutoMixin,
+ needs_module_or_skip
+)
__all__ = ['needs_pygit2_or_skip', 'Pygit2RequiredMetaMixin']
#{ Decoorators
+
def needs_pygit2_or_skip(func):
"""Skip this test if we have no pygit2 - print warning"""
return needs_module_or_skip('pygit2')(func)
@@ -17,6 +18,7 @@ def needs_pygit2_or_skip(func):
#{ MetaClasses
+
class Pygit2RequiredMetaMixin(InheritedTestMethodsOverrideWrapperMetaClsAutoMixin):
decorator = [needs_pygit2_or_skip]
diff --git a/git/test/db/pygit2/test_base.py b/git/test/db/pygit2/test_base.py
index 52ee24f5..dc1b0ac5 100644
--- a/git/test/db/pygit2/test_base.py
+++ b/git/test/db/pygit2/test_base.py
@@ -7,7 +7,6 @@ from git.test.lib import TestBase, with_rw_repo
from git.test.db.base import RepoBase
-
try:
import pygit2
except ImportError:
@@ -17,16 +16,15 @@ except ImportError:
else:
# now we know pygit2 is available, to do futher imports
from git.db.pygit2.complex import Pygit2CompatibilityGitDB as Pygit2DB
-
-#END handle imports
+
+# END handle imports
+
class TestPyGit2DBBase(RepoBase):
__metaclass__ = Pygit2RequiredMetaMixin
RepoCls = Pygit2DB
-
+
@needs_pygit2_or_skip
@with_rw_repo('HEAD', bare=False)
def test_basics(self, rw_repo):
db = Pygit2DB(rw_repo.working_tree_dir)
-
-
diff --git a/git/test/db/test_base.py b/git/test/db/test_base.py
index 78da9f04..39c935a6 100644
--- a/git/test/db/test_base.py
+++ b/git/test/db/test_base.py
@@ -5,6 +5,7 @@
from lib import *
from git.db import RefSpec
+
class TestBase(TestDBBase):
needs_ro_repo = False
@@ -17,4 +18,3 @@ class TestBase(TestDBBase):
assert rs.delete_destination()
assert rs.source is None
assert rs.destination == "something"
-
diff --git a/git/test/lib/__init__.py b/git/test/lib/__init__.py
index a0656438..a94b6617 100644
--- a/git/test/lib/__init__.py
+++ b/git/test/lib/__init__.py
@@ -5,7 +5,7 @@
# the BSD License: http://www.opensource.org/licenses/bsd-license.php
import inspect
-# TODO: Separate names - they do repeat unfortunately. Also deduplicate it,
+# TODO: Separate names - they do repeat unfortunately. Also deduplicate it,
# redesign decorators to support multiple database types in succession.
from base import *
@@ -14,5 +14,5 @@ from asserts import *
from helper import *
-__all__ = [ name for name, obj in locals().items()
- if not (name.startswith('_') or inspect.ismodule(obj)) ]
+__all__ = [name for name, obj in locals().items()
+ if not (name.startswith('_') or inspect.ismodule(obj))]
diff --git a/git/test/lib/asserts.py b/git/test/lib/asserts.py
index fa754b92..351901dc 100644
--- a/git/test/lib/asserts.py
+++ b/git/test/lib/asserts.py
@@ -10,41 +10,49 @@ from nose import tools
from nose.tools import *
import stat
-__all__ = ['assert_instance_of', 'assert_not_instance_of',
+__all__ = ['assert_instance_of', 'assert_not_instance_of',
'assert_none', 'assert_not_none',
'assert_match', 'assert_not_match', 'assert_mode_644',
'assert_mode_755'] + tools.__all__
+
def assert_instance_of(expected, actual, msg=None):
"""Verify that object is an instance of expected """
assert isinstance(actual, expected), msg
+
def assert_not_instance_of(expected, actual, msg=None):
"""Verify that object is not an instance of expected """
assert not isinstance(actual, expected, msg)
-
+
+
def assert_none(actual, msg=None):
"""verify that item is None"""
assert actual is None, msg
+
def assert_not_none(actual, msg=None):
"""verify that item is None"""
assert actual is not None, msg
+
def assert_match(pattern, string, msg=None):
"""verify that the pattern matches the string"""
assert_not_none(re.search(pattern, string), msg)
+
def assert_not_match(pattern, string, msg=None):
"""verify that the pattern does not match the string"""
assert_none(re.search(pattern, string), msg)
-
+
+
def assert_mode_644(mode):
"""Verify given mode is 644"""
- assert (mode & stat.S_IROTH) and (mode & stat.S_IRGRP)
+ assert (mode & stat.S_IROTH) and (mode & stat.S_IRGRP)
assert (mode & stat.S_IWUSR) and (mode & stat.S_IRUSR) and not (mode & stat.S_IXUSR)
+
def assert_mode_755(mode):
"""Verify given mode is 755"""
assert (mode & stat.S_IROTH) and (mode & stat.S_IRGRP) and (mode & stat.S_IXOTH) and (mode & stat.S_IXGRP)
- assert (mode & stat.S_IWUSR) and (mode & stat.S_IRUSR) and (mode & stat.S_IXUSR) \ No newline at end of file
+ assert (mode & stat.S_IWUSR) and (mode & stat.S_IRUSR) and (mode & stat.S_IXUSR)
diff --git a/git/test/lib/base.py b/git/test/lib/base.py
index 298e8e05..39bc9b73 100644
--- a/git/test/lib/base.py
+++ b/git/test/lib/base.py
@@ -4,15 +4,15 @@
# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
"""Utilities used in ODB testing"""
from git.base import OStream
-from git.stream import (
- Sha1Writer,
- ZippedStoreShaWriter
- )
+from git.stream import (
+ Sha1Writer,
+ ZippedStoreShaWriter
+)
from git.util import (
- zlib,
- dirname
- )
+ zlib,
+ dirname
+)
import sys
import random
@@ -32,6 +32,7 @@ import gc
def with_rw_directory(func):
"""Create a temporary directory which can be written to, remove it if the
test suceeds, but leave it otherwise to aid additional debugging"""
+
def wrapper(self):
path = maketemp(prefix=func.__name__)
os.mkdir(path)
@@ -45,7 +46,7 @@ def with_rw_directory(func):
raise
finally:
# Need to collect here to be sure all handles have been closed. It appears
- # a windows-only issue. In fact things should be deleted, as well as
+ # a windows-only issue. In fact things should be deleted, as well as
# memory maps closed, once objects go out of scope. For some reason
# though this is not the case here unless we collect explicitly.
if not keep:
@@ -53,7 +54,7 @@ def with_rw_directory(func):
shutil.rmtree(path)
# END handle exception
# END wrapper
-
+
wrapper.__name__ = func.__name__
return wrapper
@@ -65,6 +66,7 @@ def with_rw_repo(func):
being on a certain branch or on anything really except for the default tags
that should exist
Wrapped function obtains a git repository """
+
def wrapper(self, path):
src_dir = dirname(dirname(dirname(__file__)))
assert(os.path.isdir(path))
@@ -73,24 +75,24 @@ def with_rw_repo(func):
target_gitdir = os.path.join(path, '.git')
assert os.path.isdir(target_gitdir)
return func(self, self.RepoCls(target_gitdir))
- #END wrapper
+ # END wrapper
wrapper.__name__ = func.__name__
return with_rw_directory(wrapper)
-
def with_packs_rw(func):
"""Function that provides a path into which the packs for testing should be
copied. Will pass on the path to the actual function afterwards
-
+
:note: needs with_rw_directory wrapped around it"""
+
def wrapper(self, path):
src_pack_glob = fixture_path('packs/*')
print src_pack_glob
copy_files_globbed(src_pack_glob, path, hard_link_ok=True)
return func(self, path)
# END wrapper
-
+
wrapper.__name__ = func.__name__
return with_rw_directory(wrapper)
@@ -98,6 +100,7 @@ def with_packs_rw(func):
#{ Routines
+
def rorepo_dir():
""":return: path to our own repository, being our own .git directory.
:note: doesn't work in bare repositories"""
@@ -105,6 +108,7 @@ def rorepo_dir():
assert os.path.isdir(base)
return base
+
def maketemp(*args, **kwargs):
"""Wrapper around default tempfile.mktemp to fix an osx issue"""
tdir = tempfile.mktemp(*args, **kwargs)
@@ -112,19 +116,23 @@ def maketemp(*args, **kwargs):
tdir = '/private' + tdir
return tdir
+
def fixture_path(relapath=''):
""":return: absolute path into the fixture directory
:param relapath: relative path into the fixtures directory, or ''
to obtain the fixture directory itself"""
test_dir = os.path.dirname(os.path.dirname(__file__))
return os.path.join(test_dir, "fixtures", relapath)
-
+
+
def fixture(name):
return open(fixture_path(name), 'rb').read()
+
def absolute_project_path():
return os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
+
def copy_files_globbed(source_glob, target_dir, hard_link_ok=False):
"""Copy all files found according to the given source glob into the target directory
:param hard_link_ok: if True, hard links will be created if possible. Otherwise
@@ -141,7 +149,7 @@ def copy_files_globbed(source_glob, target_dir, hard_link_ok=False):
shutil.copy(src_file, target_dir)
# END try hard link
# END for each file to copy
-
+
def make_bytes(size_in_bytes, randomize=False):
""":return: string with given size in bytes
@@ -155,11 +163,13 @@ def make_bytes(size_in_bytes, randomize=False):
a = array('i', producer)
return a.tostring()
+
def make_object(type, data):
""":return: bytes resembling an uncompressed object"""
odata = "blob %i\0" % len(data)
return odata + data
-
+
+
def make_memory_file(size_in_bytes, randomize=False):
""":return: tuple(size_of_stream, stream)
:param randomize: try to produce a very random stream"""
@@ -170,31 +180,33 @@ def make_memory_file(size_in_bytes, randomize=False):
#{ Stream Utilities
+
class DummyStream(object):
- def __init__(self):
- self.was_read = False
- self.bytes = 0
- self.closed = False
-
- def read(self, size):
- self.was_read = True
- self.bytes = size
-
- def close(self):
- self.closed = True
-
- def _assert(self):
- assert self.was_read
+
+ def __init__(self):
+ self.was_read = False
+ self.bytes = 0
+ self.closed = False
+
+ def read(self, size):
+ self.was_read = True
+ self.bytes = size
+
+ def close(self):
+ self.closed = True
+
+ def _assert(self):
+ assert self.was_read
class DeriveTest(OStream):
+
def __init__(self, sha, type, size, stream, *args, **kwargs):
self.myarg = kwargs.pop('myarg')
self.args = args
-
+
def _assert(self):
assert self.args
assert self.myarg
#} END stream utilitiess
-
diff --git a/git/test/lib/helper.py b/git/test/lib/helper.py
index bb17745c..db6eb121 100644
--- a/git/test/lib/helper.py
+++ b/git/test/lib/helper.py
@@ -16,36 +16,37 @@ import warnings
from nose import SkipTest
from base import (
- maketemp,
- rorepo_dir
- )
+ maketemp,
+ rorepo_dir
+)
__all__ = (
- 'StringProcessAdapter', 'GlobalsItemDeletorMetaCls', 'InheritedTestMethodsOverrideWrapperMetaClsAutoMixin',
- 'with_rw_repo', 'with_rw_and_rw_remote_repo', 'TestBase', 'TestCase', 'needs_module_or_skip'
- )
+ 'StringProcessAdapter', 'GlobalsItemDeletorMetaCls', 'InheritedTestMethodsOverrideWrapperMetaClsAutoMixin',
+ 'with_rw_repo', 'with_rw_and_rw_remote_repo', 'TestBase', 'TestCase', 'needs_module_or_skip'
+)
-
-#{ Adapters
-
+#{ Adapters
+
class StringProcessAdapter(object):
+
"""Allows to use strings as Process object as returned by SubProcess.Popen.
Its tailored to work with the test system only"""
-
+
def __init__(self, input_string):
self.stdout = cStringIO.StringIO(input_string)
self.stderr = cStringIO.StringIO()
-
+
def wait(self):
return 0
-
+
poll = wait
-
+
#} END adapters
-#{ Decorators
+#{ Decorators
+
def _rmtree_onerror(osremove, fullpath, exec_info):
"""
@@ -54,35 +55,37 @@ def _rmtree_onerror(osremove, fullpath, exec_info):
"""
if os.name != 'nt' or osremove is not os.remove:
raise
-
+
os.chmod(fullpath, 0777)
os.remove(fullpath)
+
def with_rw_repo(working_tree_ref, bare=False):
"""
Same as with_bare_repo, but clones the rorepo as non-bare repository, checking
out the working tree at the given working_tree_ref.
-
+
This repository type is more costly due to the working copy checkout.
-
+
To make working with relative paths easier, the cwd will be set to the working
dir of the repository.
"""
assert isinstance(working_tree_ref, basestring), "Decorator requires ref name for working tree checkout"
+
def argument_passer(func):
def repo_creator(self):
prefix = 'non_'
if bare:
prefix = ''
- #END handle prefix
+ # END handle prefix
repo_dir = maketemp("%sbare_%s" % (prefix, func.__name__))
rw_repo = self.rorepo.clone(repo_dir, shared=True, bare=bare, n=True)
-
+
rw_repo.head.commit = rw_repo.commit(working_tree_ref)
if not bare:
rw_repo.head.reference.checkout()
# END handle checkout
-
+
prev_cwd = os.getcwd()
os.chdir(rw_repo.working_dir)
try:
@@ -104,7 +107,8 @@ def with_rw_repo(working_tree_ref, bare=False):
return repo_creator
# END argument passer
return argument_passer
-
+
+
def with_rw_and_rw_remote_repo(working_tree_ref):
"""
Same as with_rw_repo, but also provides a writable remote repository from which the
@@ -112,36 +116,38 @@ def with_rw_and_rw_remote_repo(working_tree_ref):
run the remote_repo.
The remote repository was cloned as bare repository from the rorepo, wheras
the rw repo has a working tree and was cloned from the remote repository.
-
+
remote_repo has two remotes: origin and daemon_origin. One uses a local url,
the other uses a server url. The daemon setup must be done on system level
and should be an inetd service that serves tempdir.gettempdir() and all
directories in it.
-
+
The following scetch demonstrates this::
rorepo ---<bare clone>---> rw_remote_repo ---<clone>---> rw_repo
-
+
The test case needs to support the following signature::
def case(self, rw_repo, rw_remote_repo)
-
+
This setup allows you to test push and pull scenarios and hooks nicely.
-
+
See working dir info in with_rw_repo
"""
assert isinstance(working_tree_ref, basestring), "Decorator requires ref name for working tree checkout"
+
def argument_passer(func):
def remote_repo_creator(self):
remote_repo_dir = maketemp("remote_repo_%s" % func.__name__)
repo_dir = maketemp("remote_clone_non_bare_repo")
-
+
rw_remote_repo = self.rorepo.clone(remote_repo_dir, shared=True, bare=True)
- rw_repo = rw_remote_repo.clone(repo_dir, shared=True, bare=False, n=True) # recursive alternates info ?
+ # recursive alternates info ?
+ rw_repo = rw_remote_repo.clone(repo_dir, shared=True, bare=False, n=True)
rw_repo.head.commit = working_tree_ref
rw_repo.head.reference.checkout()
-
+
# prepare for git-daemon
rw_remote_repo.daemon_export = True
-
+
# this thing is just annoying !
crw = rw_remote_repo.config_writer()
section = "daemon"
@@ -152,28 +158,30 @@ def with_rw_and_rw_remote_repo(working_tree_ref):
crw.set(section, "receivepack", True)
# release lock
del(crw)
-
- # initialize the remote - first do it as local remote and pull, then
+
+ # initialize the remote - first do it as local remote and pull, then
# we change the url to point to the daemon. The daemon should be started
# by the user, not by us
d_remote = Remote.create(rw_repo, "daemon_origin", remote_repo_dir)
d_remote.fetch()
remote_repo_url = "git://localhost%s" % remote_repo_dir
-
+
d_remote.config_writer.set('url', remote_repo_url)
-
+
# try to list remotes to diagnoes whether the server is up
try:
rw_repo.git.ls_remote(d_remote)
- except GitCommandError,e:
+ except GitCommandError, e:
print str(e)
if os.name == 'nt':
- raise AssertionError('git-daemon needs to run this test, but windows does not have one. Otherwise, run: git-daemon "%s"' % os.path.dirname(_mktemp()))
+ raise AssertionError(
+ 'git-daemon needs to run this test, but windows does not have one. Otherwise, run: git-daemon "%s"' % os.path.dirname(_mktemp()))
else:
- raise AssertionError('Please start a git-daemon to run this test, execute: git-daemon "%s"' % os.path.dirname(_mktemp()))
+ raise AssertionError(
+ 'Please start a git-daemon to run this test, execute: git-daemon "%s"' % os.path.dirname(_mktemp()))
# END make assertion
- #END catch ls remote error
-
+ # END catch ls remote error
+
# adjust working dir
prev_cwd = os.getcwd()
os.chdir(rw_repo.working_dir)
@@ -191,9 +199,10 @@ def with_rw_and_rw_remote_repo(working_tree_ref):
return remote_repo_creator
# END remote repo creator
# END argument parsser
-
+
return argument_passer
-
+
+
def needs_module_or_skip(module):
"""Decorator to be used for test cases only.
Print a warning if the given module could not be imported, and skip the test.
@@ -207,25 +216,28 @@ def needs_module_or_skip(module):
msg = "Module %r is required to run this test - skipping" % module
warnings.warn(msg)
raise SkipTest(msg)
- #END check import
+ # END check import
return func(self, *args, **kwargs)
- #END wrapper
+ # END wrapper
wrapper.__name__ = func.__name__
return wrapper
- #END argpasser
+ # END argpasser
return argpasser
-
+
#} END decorators
#{ Meta Classes
+
+
class GlobalsItemDeletorMetaCls(type):
+
"""Utiltiy to prevent the RepoBase to be picked up by nose as the metacls
will delete the instance from the globals"""
#{ Configuration
# Set this to a string name of the module to delete
ModuleToDelete = None
#} END configuration
-
+
def __new__(metacls, name, bases, clsdict):
assert metacls.ModuleToDelete is not None, "Invalid metaclass configuration"
new_type = super(GlobalsItemDeletorMetaCls, metacls).__new__(metacls, name, bases, clsdict)
@@ -235,22 +247,23 @@ class GlobalsItemDeletorMetaCls(type):
delattr(mod, metacls.ModuleToDelete)
except AttributeError:
pass
- #END skip case that people import our base without actually using it
- #END handle deletion
+ # END skip case that people import our base without actually using it
+ # END handle deletion
return new_type
-
-
+
+
class InheritedTestMethodsOverrideWrapperMetaClsAutoMixin(object):
+
"""Automatically picks up the actual metaclass of the the type to be created,
that is the one inherited by one of the bases, and patch up its __new__ to use
the InheritedTestMethodsOverrideWrapperInstanceDecorator with our configured decorator"""
-
+
#{ Configuration
# decorator function to use when wrapping the inherited methods. Put it into a list as first member
# to hide it from being created as class method
decorator = []
#}END configuration
-
+
@classmethod
def _find_metacls(metacls, bases):
"""emulate pythons lookup"""
@@ -259,9 +272,9 @@ class InheritedTestMethodsOverrideWrapperMetaClsAutoMixin(object):
if hasattr(base, mcls_attr):
return getattr(base, mcls_attr)
return metacls._find_metacls(base.__bases__)
- #END for each base
+ # END for each base
raise AssertionError("base class had not metaclass attached")
-
+
@classmethod
def _patch_methods_recursive(metacls, bases, clsdict):
"""depth-first patching of methods"""
@@ -270,33 +283,35 @@ class InheritedTestMethodsOverrideWrapperMetaClsAutoMixin(object):
for name, item in base.__dict__.iteritems():
if not name.startswith('test_'):
continue
- #END skip non-tests
+ # END skip non-tests
clsdict[name] = metacls.decorator[0](item)
- #END for each item
- #END for each base
-
+ # END for each item
+ # END for each base
+
def __new__(metacls, name, bases, clsdict):
assert metacls.decorator, "'decorator' member needs to be set in subclass"
base_metacls = metacls._find_metacls(bases)
metacls._patch_methods_recursive(bases, clsdict)
return base_metacls.__new__(base_metacls, name, bases, clsdict)
-
+
#} END meta classes
-
+
+
class TestBase(TestCase):
+
"""
Base Class providing default functionality to all tests such as:
- Utility functions provided by the TestCase base of the unittest method such as::
self.fail("todo")
self.failUnlessRaises(...)
"""
-
+
@classmethod
def setUp(cls):
"""This method is only called to provide the most basic functionality
Subclasses may just override it or implement it differently"""
cls.rorepo = Repo(rorepo_dir())
-
+
def _make_file(self, rela_path, data, repo=None):
"""
Create a file at the given path relative to our repository, filled
diff --git a/git/test/objects/__init__.py b/git/test/objects/__init__.py
index 8b137891..e69de29b 100644
--- a/git/test/objects/__init__.py
+++ b/git/test/objects/__init__.py
@@ -1 +0,0 @@
-
diff --git a/git/test/objects/lib.py b/git/test/objects/lib.py
index e3860ba5..08ecaa2a 100644
--- a/git/test/objects/lib.py
+++ b/git/test/objects/lib.py
@@ -1,14 +1,16 @@
"""Provide customized obhject testing facilities"""
from git.test.lib import (
- rorepo_dir,
- TestBase,
- assert_equal,
- assert_not_equal,
- with_rw_repo,
- StringProcessAdapter,
- )
+ rorepo_dir,
+ TestBase,
+ assert_equal,
+ assert_not_equal,
+ with_rw_repo,
+ StringProcessAdapter,
+)
+
class TestObjectBase(TestBase):
+
"""Provides a default read-only repository in the rorepo member"""
pass
diff --git a/git/test/objects/test_blob.py b/git/test/objects/test_blob.py
index 978ab931..96fccd44 100644
--- a/git/test/objects/test_blob.py
+++ b/git/test/objects/test_blob.py
@@ -8,16 +8,16 @@ from lib import *
from git.objects.blob import *
from git.util import hex_to_bin
+
class TestBlob(TestObjectBase):
-
+
def test_mime_type_should_return_mime_type_for_known_types(self):
blob = Blob(self.rorepo, **{'binsha': Blob.NULL_BIN_SHA, 'path': 'foo.png'})
assert_equal("image/png", blob.mime_type)
-
+
def test_mime_type_should_return_text_plain_for_unknown_types(self):
- blob = Blob(self.rorepo, **{'binsha': Blob.NULL_BIN_SHA,'path': 'something'})
+ blob = Blob(self.rorepo, **{'binsha': Blob.NULL_BIN_SHA, 'path': 'something'})
assert_equal("text/plain", blob.mime_type)
-
+
def test_nodict(self):
self.failUnlessRaises(AttributeError, setattr, self.rorepo.tree()['AUTHORS'], 'someattr', 2)
-
diff --git a/git/test/objects/test_commit.py b/git/test/objects/test_commit.py
index 1b8b69c7..996c4367 100644
--- a/git/test/objects/test_commit.py
+++ b/git/test/objects/test_commit.py
@@ -10,9 +10,9 @@ from git.objects.commit import *
from git.base import IStream
from git.util import (
- hex_to_bin,
- Actor,
- )
+ hex_to_bin,
+ Actor,
+)
from cStringIO import StringIO
import time
@@ -25,49 +25,50 @@ def assert_commit_serialization(rwrepo, commit_id, print_performance_info=False)
:param print_performance_info: if True, we will show how fast we are"""
ns = 0 # num serializations
nds = 0 # num deserializations
-
+
st = time.time()
for cm in rwrepo.commit(commit_id).traverse():
nds += 1
-
- # assert that we deserialize commits correctly, hence we get the same
+
+ # assert that we deserialize commits correctly, hence we get the same
# sha on serialization
stream = StringIO()
cm._serialize(stream)
ns += 1
streamlen = stream.tell()
stream.seek(0)
-
+
istream = rwrepo.odb.store(IStream(Commit.type, streamlen, stream))
assert istream.hexsha == cm.hexsha
-
+
nc = Commit(rwrepo, Commit.NULL_BIN_SHA, cm.tree,
- cm.author, cm.authored_date, cm.author_tz_offset,
- cm.committer, cm.committed_date, cm.committer_tz_offset,
- cm.message, cm.parents, cm.encoding)
-
+ cm.author, cm.authored_date, cm.author_tz_offset,
+ cm.committer, cm.committed_date, cm.committer_tz_offset,
+ cm.message, cm.parents, cm.encoding)
+
assert nc.parents == cm.parents
stream = StringIO()
nc._serialize(stream)
ns += 1
streamlen = stream.tell()
stream.seek(0)
-
+
# reuse istream
istream.size = streamlen
istream.stream = stream
istream.binsha = None
nc.binsha = rwrepo.odb.store(istream).binsha
-
+
# if it worked, we have exactly the same contents !
assert nc.hexsha == cm.hexsha
# END check commits
elapsed = time.time() - st
-
+
if print_performance_info:
- print >> sys.stderr, "Serialized %i and deserialized %i commits in %f s ( (%f, %f) commits / s" % (ns, nds, elapsed, ns/elapsed, nds/elapsed)
+ print >> sys.stderr, "Serialized %i and deserialized %i commits in %f s ( (%f, %f) commits / s" % (
+ ns, nds, elapsed, ns / elapsed, nds / elapsed)
# END handle performance info
-
+
class TestCommit(TestObjectBase):
@@ -76,7 +77,7 @@ class TestCommit(TestObjectBase):
commit = self.rorepo.commit('2454ae89983a4496a445ce347d7a41c0bb0ea7ae')
# commits have no dict
self.failUnlessRaises(AttributeError, setattr, commit, 'someattr', 1)
- commit.author # bake
+ commit.author # bake
assert_equal("Sebastian Thiel", commit.author.name)
assert_equal("byronimo@gmail.com", commit.author.email)
@@ -85,26 +86,25 @@ class TestCommit(TestObjectBase):
assert isinstance(commit.author_tz_offset, int) and isinstance(commit.committer_tz_offset, int)
assert commit.message == "Added missing information to docstrings of commit and stats module\n"
-
def test_stats(self):
commit = self.rorepo.commit('33ebe7acec14b25c5f84f35a664803fcab2f7781')
stats = commit.stats
-
+
def check_entries(d):
assert isinstance(d, dict)
for key in ("insertions", "deletions", "lines"):
assert key in d
- # END assertion helper
- assert stats.files
+ # END assertion helper
+ assert stats.files
assert stats.total
-
- check_entries(stats.total)
+
+ check_entries(stats.total)
assert "files" in stats.total
-
+
for filepath, d in stats.files.items():
check_entries(d)
# END for each stated file
-
+
# assure data is parsed properly
michael = Actor._from_string("Michael Trier <mtrier@gmail.com>")
assert commit.author == michael
@@ -114,7 +114,7 @@ class TestCommit(TestObjectBase):
assert commit.author_tz_offset == 14400, commit.author_tz_offset
assert commit.committer_tz_offset == 14400, commit.committer_tz_offset
assert commit.message == "initial project\n"
-
+
def test_unicode_actor(self):
# assure we can parse unicode actors correctly
name = "Üäöß ÄußÉ".decode("utf-8")
@@ -122,7 +122,7 @@ class TestCommit(TestObjectBase):
special = Actor._from_string(u"%s <something@this.com>" % name)
assert special.name == name
assert isinstance(special.name, unicode)
-
+
def test_traversal(self):
start = self.rorepo.commit("a4d06724202afccd2b5c54f81bcf2bf26dea7fff")
first = self.rorepo.commit("33ebe7acec14b25c5f84f35a664803fcab2f7781")
@@ -130,73 +130,73 @@ class TestCommit(TestObjectBase):
p1 = start.parents[1]
p00 = p0.parents[0]
p10 = p1.parents[0]
-
+
# basic branch first, depth first
dfirst = start.traverse(branch_first=False)
bfirst = start.traverse(branch_first=True)
assert dfirst.next() == p0
assert dfirst.next() == p00
-
+
assert bfirst.next() == p0
assert bfirst.next() == p1
assert bfirst.next() == p00
assert bfirst.next() == p10
-
+
# at some point, both iterations should stop
assert list(bfirst)[-1] == first
stoptraverse = self.rorepo.commit("254d04aa3180eb8b8daf7b7ff25f010cd69b4e7d").traverse(as_edge=True)
l = list(stoptraverse)
assert len(l[0]) == 2
-
+
# ignore self
assert start.traverse(ignore_self=False).next() == start
-
- # depth
+
+ # depth
assert len(list(start.traverse(ignore_self=False, depth=0))) == 1
-
+
# prune
- assert start.traverse(branch_first=1, prune=lambda i,d: i==p0).next() == p1
-
+ assert start.traverse(branch_first=1, prune=lambda i, d: i == p0).next() == p1
+
# predicate
- assert start.traverse(branch_first=1, predicate=lambda i,d: i==p1).next() == p1
-
+ assert start.traverse(branch_first=1, predicate=lambda i, d: i == p1).next() == p1
+
# traversal should stop when the beginning is reached
self.failUnlessRaises(StopIteration, first.traverse().next)
-
- # parents of the first commit should be empty ( as the only parent has a null
+
+ # parents of the first commit should be empty ( as the only parent has a null
# sha )
assert len(first.parents) == 0
-
+
def test_iteration(self):
# we can iterate commits
all_commits = Commit.list_items(self.rorepo, self.rorepo.head)
assert all_commits
assert all_commits == list(self.rorepo.iter_commits())
-
+
# this includes merge commits
mcomit = self.rorepo.commit('d884adc80c80300b4cc05321494713904ef1df2d')
assert mcomit in all_commits
-
+
# we can limit the result to paths
ltd_commits = list(self.rorepo.iter_commits(paths='CHANGES'))
assert ltd_commits and len(ltd_commits) < len(all_commits)
-
+
# show commits of multiple paths, resulting in a union of commits
less_ltd_commits = list(Commit.iter_items(self.rorepo, 'master', paths=('CHANGES', 'AUTHORS')))
assert len(ltd_commits) < len(less_ltd_commits)
-
+
def test_iter_items(self):
# pretty not allowed
self.failUnlessRaises(ValueError, Commit.iter_items, self.rorepo, 'master', pretty="raw")
-
+
def test_rev_list_bisect_all(self):
"""
'git rev-list --bisect-all' returns additional information
in the commit header. This test ensures that we properly parse it.
"""
revs = self.rorepo.git.rev_list('933d23bf95a5bd1624fbcdf328d904e1fa173474',
- first_parent=True,
- bisect_all=True)
+ first_parent=True,
+ bisect_all=True)
commits = Commit._iter_from_process_or_stream(self.rorepo, StringProcessAdapter(revs))
expected_ids = (
@@ -209,10 +209,11 @@ class TestCommit(TestObjectBase):
assert_equal(sha1, commit.hexsha)
def test_count(self):
- assert self.rorepo.tag('refs/tags/0.1.5').commit.count( ) == 143
-
+ assert self.rorepo.tag('refs/tags/0.1.5').commit.count() == 143
+
def test_list(self):
- assert isinstance(Commit.list_items(self.rorepo, '0.1.5', max_count=5)[hex_to_bin('5117c9c8a4d3af19a9958677e45cda9269de1541')], Commit)
+ assert isinstance(Commit.list_items(self.rorepo, '0.1.5', max_count=5)[
+ hex_to_bin('5117c9c8a4d3af19a9958677e45cda9269de1541')], Commit)
def test_str(self):
commit = Commit(self.rorepo, Commit.NULL_BIN_SHA)
@@ -225,10 +226,10 @@ class TestCommit(TestObjectBase):
def test_equality(self):
commit1 = Commit(self.rorepo, Commit.NULL_BIN_SHA)
commit2 = Commit(self.rorepo, Commit.NULL_BIN_SHA)
- commit3 = Commit(self.rorepo, "\1"*20)
+ commit3 = Commit(self.rorepo, "\1" * 20)
assert_equal(commit1, commit2)
assert_not_equal(commit2, commit3)
-
+
def test_iter_parents(self):
# should return all but ourselves, even if skip is defined
c = self.rorepo.commit('0.1.5')
@@ -237,43 +238,42 @@ class TestCommit(TestObjectBase):
first_parent = piter.next()
assert first_parent != c
assert first_parent == c.parents[0]
- # END for each
-
+ # END for each
+
def test_base(self):
name_rev = self.rorepo.head.commit.name_rev
assert isinstance(name_rev, basestring)
-
+
@with_rw_repo('HEAD', bare=True)
def test_serialization(self, rwrepo):
# create all commits of our repo
assert_commit_serialization(rwrepo, '0.1.6')
-
+
def test_serialization_unicode_support(self):
assert Commit.default_encoding.lower() == 'utf-8'
-
+
# create a commit with unicode in the message, and the author's name
# Verify its serialization and deserialization
cmt = self.rorepo.commit('0.1.6')
assert isinstance(cmt.message, unicode) # it automatically decodes it as such
- assert isinstance(cmt.author.name, unicode) # same here
-
+ assert isinstance(cmt.author.name, unicode) # same here
+
cmt.message = "üäêèß".decode("utf-8")
assert len(cmt.message) == 5
-
+
cmt.author.name = "äüß".decode("utf-8")
assert len(cmt.author.name) == 3
-
+
cstream = StringIO()
cmt._serialize(cstream)
cstream.seek(0)
assert len(cstream.getvalue())
-
+
ncmt = Commit(self.rorepo, cmt.binsha)
ncmt._deserialize(cstream)
-
+
assert cmt.author.name == ncmt.author.name
assert cmt.message == ncmt.message
# actually, it can't be printed in a shell as repr wants to have ascii only
# it appears
cmt.author.__repr__()
-
diff --git a/git/test/objects/test_submodule.py b/git/test/objects/test_submodule.py
index bfafb150..650bd706 100644
--- a/git/test/objects/test_submodule.py
+++ b/git/test/objects/test_submodule.py
@@ -22,18 +22,21 @@ if sys.platform == 'win32':
smmap.util.MapRegion._test_read_into_memory = True
except ImportError:
sys.stderr.write("The submodule tests will fail as some files cannot be removed due to open file handles.\n")
- sys.stderr.write("The latest version of gitdb uses a memory map manager which can be configured to work around this problem")
-#END handle windows platform
+ sys.stderr.write(
+ "The latest version of gitdb uses a memory map manager which can be configured to work around this problem")
+# END handle windows platform
class TestRootProgress(RootUpdateProgress):
+
"""Just prints messages, for now without checking the correctness of the states"""
-
+
def update(self, op, index, max_count, message='', input=''):
print message
-
+
prog = TestRootProgress()
+
class TestSubmodule(TestObjectBase):
k_subm_current = "468cad66ff1f80ddaeee4123c24e4d53a032c00d"
@@ -41,7 +44,7 @@ class TestSubmodule(TestObjectBase):
k_no_subm_tag = "0.1.6"
k_github_gitdb_url = 'git://github.com/gitpython-developers/gitdb.git'
env_gitdb_local_path = "GITPYTHON_TEST_GITDB_LOCAL_PATH"
-
+
def _generate_async_local_path(self):
return to_native_path_linux(join_path_native(self.rorepo.working_tree_dir, 'git/ext/async'))
@@ -57,25 +60,26 @@ class TestSubmodule(TestObjectBase):
assert smgitdb.config_reader().get_value('url') == new_smclone_path
assert smgitdb.url == new_smclone_path
else:
- sys.stderr.write("Submodule tests need the gitdb repository. You can specify a local source setting the %s environment variable. Otherwise it will be downloaded from the internet" % self.env_gitdb_local_path)
- #END handle submodule path
+ sys.stderr.write(
+ "Submodule tests need the gitdb repository. You can specify a local source setting the %s environment variable. Otherwise it will be downloaded from the internet" % self.env_gitdb_local_path)
+ # END handle submodule path
return new_smclone_path
def _do_base_tests(self, rwrepo):
"""Perform all tests in the given repository, it may be bare or nonbare"""
# manual instantiation
- smm = Submodule(rwrepo, "\0"*20)
+ smm = Submodule(rwrepo, "\0" * 20)
# name needs to be set in advance
- self.failUnlessRaises(AttributeError, getattr, smm, 'name')
-
+ self.failUnlessRaises(AttributeError, getattr, smm, 'name')
+
# iterate - 1 submodule
sms = Submodule.list_items(rwrepo, self.k_subm_current)
assert len(sms) == 1
sm = sms[0]
-
+
# at a different time, there is None
assert len(Submodule.list_items(rwrepo, self.k_no_subm_tag)) == 0
-
+
assert sm.path == 'git/ext/gitdb'
assert sm.path != sm.name # in our case, we have ids there, which don't equal the path
assert sm.url == self.k_github_gitdb_url
@@ -86,55 +90,55 @@ class TestSubmodule(TestObjectBase):
assert sm.size == 0
# the module is not checked-out yet
self.failUnlessRaises(InvalidGitRepositoryError, sm.module)
-
+
# which is why we can't get the branch either - it points into the module() repository
self.failUnlessRaises(InvalidGitRepositoryError, getattr, sm, 'branch')
-
+
# branch_path works, as its just a string
assert isinstance(sm.branch_path, basestring)
-
+
# some commits earlier we still have a submodule, but its at a different commit
smold = Submodule.iter_items(rwrepo, self.k_subm_changed).next()
assert smold.binsha != sm.binsha
assert smold != sm # the name changed
-
+
# force it to reread its information
del(smold._url)
smold.url == sm.url
-
+
# test config_reader/writer methods
sm.config_reader()
- new_smclone_path = None # keep custom paths for later
- new_csmclone_path = None #
+ new_smclone_path = None # keep custom paths for later
+ new_csmclone_path = None #
if rwrepo.bare:
self.failUnlessRaises(InvalidGitRepositoryError, sm.config_writer)
else:
# for faster checkout, set the url to the local path
# Note: This is nice but doesn't work anymore with the latest git-python
- # version. This would also mean we need internet for this to work which
+ # version. This would also mean we need internet for this to work which
# is why we allow an override using an environment variable
new_smclone_path = self._rewrite_gitdb_to_local_path(sm)
# END handle bare repo
smold.config_reader()
-
+
# cannot get a writer on historical submodules
if not rwrepo.bare:
self.failUnlessRaises(ValueError, smold.config_writer)
# END handle bare repo
-
+
# make the old into a new - this doesn't work as the name changed
prev_parent_commit = smold.parent_commit
self.failUnlessRaises(ValueError, smold.set_parent_commit, self.k_subm_current)
# the sha is properly updated
- smold.set_parent_commit(self.k_subm_changed+"~1")
+ smold.set_parent_commit(self.k_subm_changed + "~1")
assert smold.binsha != sm.binsha
-
- # raises if the sm didn't exist in new parent - it keeps its
+
+ # raises if the sm didn't exist in new parent - it keeps its
# parent_commit unchanged
self.failUnlessRaises(ValueError, smold.set_parent_commit, self.k_no_subm_tag)
-
+
# TEST TODO: if a path in the gitmodules file, but not in the index, it raises
-
+
# TEST UPDATE
##############
# module retrieval is not always possible
@@ -146,108 +150,106 @@ class TestSubmodule(TestObjectBase):
# its not checked out in our case
self.failUnlessRaises(InvalidGitRepositoryError, sm.module)
assert not sm.module_exists()
-
+
# currently there is only one submodule
assert len(list(rwrepo.iter_submodules())) == 1
- assert sm.binsha != "\0"*20
-
+ assert sm.binsha != "\0" * 20
+
# TEST ADD
###########
# preliminary tests
# adding existing returns exactly the existing
sma = Submodule.add(rwrepo, sm.name, sm.path)
assert sma.path == sm.path
-
+
# no url and no module at path fails
self.failUnlessRaises(ValueError, Submodule.add, rwrepo, "newsubm", "pathtorepo", url=None)
-
+
# CONTINUE UPDATE
#################
-
+
# lets update it - its a recursive one too
newdir = os.path.join(sm.abspath, 'dir')
os.makedirs(newdir)
-
+
# update fails if the path already exists non-empty
self.failUnlessRaises(OSError, sm.update)
os.rmdir(newdir)
-
+
# dry-run does nothing
sm.update(dry_run=True, progress=prog)
assert not sm.module_exists()
-
+
assert sm.update() is sm
sm_repopath = sm.path # cache for later
assert sm.module_exists()
assert isinstance(sm.module(), git.Repo)
assert sm.module().working_tree_dir == sm.abspath
-
+
# INTERLEAVE ADD TEST
#####################
# url must match the one in the existing repository ( if submodule name suggests a new one )
# or we raise
self.failUnlessRaises(ValueError, Submodule.add, rwrepo, "newsubm", sm.path, "git://someurl/repo.git")
-
-
+
# CONTINUE UPDATE
#################
# we should have setup a tracking branch, which is also active
assert sm.module().head.ref.tracking_branch() is not None
-
+
# delete the whole directory and re-initialize
shutil.rmtree(sm.abspath)
assert len(sm.children()) == 0
# dry-run does nothing
sm.update(dry_run=True, recursive=False, progress=prog)
assert len(sm.children()) == 0
-
+
sm.update(recursive=False)
assert len(list(rwrepo.iter_submodules())) == 2
assert len(sm.children()) == 1 # its not checked out yet
csm = sm.children()[0]
assert not csm.module_exists()
csm_repopath = csm.path
-
+
# adjust the path of the submodules module to point to the local destination
# In the current gitpython version, async is used directly by gitpython
new_csmclone_path = self._generate_async_local_path()
csm.config_writer().set_value('url', new_csmclone_path)
assert csm.url == new_csmclone_path
-
+
# dry-run does nothing
assert not csm.module_exists()
sm.update(recursive=True, dry_run=True, progress=prog)
assert not csm.module_exists()
-
+
# update recursively again
sm.update(recursive=True)
assert csm.module_exists()
-
+
# tracking branch once again
csm.module().head.ref.tracking_branch() is not None
-
+
# this flushed in a sub-submodule
assert len(list(rwrepo.iter_submodules())) == 2
-
-
+
# reset both heads to the previous version, verify that to_latest_revision works
smods = (sm.module(), csm.module())
for repo in smods:
repo.head.reset('HEAD~2', working_tree=1)
# END for each repo to reset
-
- # dry run does nothing
+
+ # dry run does nothing
sm.update(recursive=True, dry_run=True, progress=prog)
for repo in smods:
assert repo.head.commit != repo.head.ref.tracking_branch().commit
# END for each repo to check
-
+
sm.update(recursive=True, to_latest_revision=True)
for repo in smods:
assert repo.head.commit == repo.head.ref.tracking_branch().commit
# END for each repo to check
del(smods)
-
+
# if the head is detached, it still works ( but warns )
smref = sm.module().head.ref
sm.module().head.ref = 'HEAD~1'
@@ -255,15 +257,15 @@ class TestSubmodule(TestObjectBase):
csm_tracking_branch = csm.module().head.ref.tracking_branch()
csm.module().head.ref.set_tracking_branch(None)
sm.update(recursive=True, to_latest_revision=True)
-
+
# to_latest_revision changes the child submodule's commit, it needs an
# update now
csm.set_parent_commit(csm.repo.head.commit)
-
+
# undo the changes
sm.module().head.ref = smref
csm.module().head.ref.set_tracking_branch(csm_tracking_branch)
-
+
# REMOVAL OF REPOSITOTRY
########################
# must delete something
@@ -281,24 +283,24 @@ class TestSubmodule(TestObjectBase):
# still, we have the file modified
self.failUnlessRaises(InvalidGitRepositoryError, sm.remove, dry_run=True)
sm.module().index.reset(working_tree=True)
-
+
# make sure sub-submodule is not modified by forcing it to update
# to the revision it is supposed to point to.
for subitem in sm.traverse():
subitem.update()
- #END checkout to right commit
-
+ # END checkout to right commit
+
# this would work
assert sm.remove(dry_run=True) is sm
assert sm.module_exists()
sm.remove(force=True, dry_run=True)
assert sm.module_exists()
-
+
# but ... we have untracked files in the child submodule
fn = join_path_native(csm.module().working_tree_dir, "newfile")
open(fn, 'w').write("hi")
self.failUnlessRaises(InvalidGitRepositoryError, sm.remove)
-
+
# forcibly delete the child repository
prev_count = len(sm.children())
assert csm.remove(force=True) is csm
@@ -308,62 +310,62 @@ class TestSubmodule(TestObjectBase):
# now we have a changed index, as configuration was altered.
# fix this
sm.module().index.reset(working_tree=True)
-
+
# now delete only the module of the main submodule
assert sm.module_exists()
sm.remove(configuration=False)
assert sm.exists()
assert not sm.module_exists()
assert sm.config_reader().get_value('url')
-
+
# delete the rest
sm.remove()
assert not sm.exists()
assert not sm.module_exists()
-
+
assert len(rwrepo.submodules) == 0
-
+
# ADD NEW SUBMODULE
###################
# add a simple remote repo - trailing slashes are no problem
smid = "newsub"
osmid = "othersub"
- nsm = Submodule.add(rwrepo, smid, sm_repopath, new_smclone_path+"/", None, no_checkout=True)
+ nsm = Submodule.add(rwrepo, smid, sm_repopath, new_smclone_path + "/", None, no_checkout=True)
assert nsm.name == smid
assert nsm.module_exists()
assert nsm.exists()
# its not checked out
assert not os.path.isfile(join_path_native(nsm.module().working_tree_dir, Submodule.k_modules_file))
assert len(rwrepo.submodules) == 1
-
+
# add another submodule, but into the root, not as submodule
osm = Submodule.add(rwrepo, osmid, csm_repopath, new_csmclone_path, Submodule.k_head_default)
assert osm != nsm
assert osm.module_exists()
assert osm.exists()
assert os.path.isfile(join_path_native(osm.module().working_tree_dir, 'setup.py'))
-
+
assert len(rwrepo.submodules) == 2
-
+
# commit the changes, just to finalize the operation
rwrepo.index.commit("my submod commit")
assert len(rwrepo.submodules) == 2
-
- # needs update as the head changed, it thinks its in the history
+
+ # needs update as the head changed, it thinks its in the history
# of the repo otherwise
nsm.set_parent_commit(rwrepo.head.commit)
osm.set_parent_commit(rwrepo.head.commit)
-
+
# MOVE MODULE
#############
# invalid inptu
self.failUnlessRaises(ValueError, nsm.move, 'doesntmatter', module=False, configuration=False)
-
+
# renaming to the same path does nothing
assert nsm.move(sm.path) is nsm
-
+
# rename a module
- nmp = join_path_native("new", "module", "dir") + "/" # new module path
+ nmp = join_path_native("new", "module", "dir") + "/" # new module path
pmp = nsm.path
abspmp = nsm.abspath
assert nsm.move(nmp) is nsm
@@ -371,43 +373,43 @@ class TestSubmodule(TestObjectBase):
nmpl = to_native_path_linux(nmp)
assert nsm.path == nmpl
assert rwrepo.submodules[0].path == nmpl
-
+
mpath = 'newsubmodule'
absmpath = join_path_native(rwrepo.working_tree_dir, mpath)
open(absmpath, 'w').write('')
self.failUnlessRaises(ValueError, nsm.move, mpath)
os.remove(absmpath)
-
+
# now it works, as we just move it back
nsm.move(pmp)
assert nsm.path == pmp
assert rwrepo.submodules[0].path == pmp
-
+
# TODO lowprio: test remaining exceptions ... for now its okay, the code looks right
-
+
# REMOVE 'EM ALL
################
# if a submodule's repo has no remotes, it can't be added without an explicit url
osmod = osm.module()
-
+
osm.remove(module=False)
for remote in osmod.remotes:
remote.remove(osmod, remote.name)
assert not osm.exists()
- self.failUnlessRaises(ValueError, Submodule.add, rwrepo, osmid, csm_repopath, url=None)
+ self.failUnlessRaises(ValueError, Submodule.add, rwrepo, osmid, csm_repopath, url=None)
# END handle bare mode
-
+
# Error if there is no submodule file here
self.failUnlessRaises(IOError, Submodule._config_parser, rwrepo, rwrepo.commit(self.k_no_subm_tag), True)
-
+
@with_rw_repo(k_subm_current)
def test_base_rw(self, rwrepo):
self._do_base_tests(rwrepo)
-
+
@with_rw_repo(k_subm_current, bare=True)
def test_base_bare(self, rwrepo):
self._do_base_tests(rwrepo)
-
+
@with_rw_repo(k_subm_current, bare=False)
def test_root_module(self, rwrepo):
# Can query everything without problems
@@ -415,7 +417,7 @@ class TestSubmodule(TestObjectBase):
# test new constructor
assert rm.parent_commit == RootModule(self.rorepo, self.rorepo.commit(self.k_subm_current)).parent_commit
assert rm.module() is rwrepo
-
+
# try attributes
rm.binsha
rm.mode
@@ -424,24 +426,24 @@ class TestSubmodule(TestObjectBase):
assert rm.parent_commit == self.rorepo.commit(self.k_subm_current)
rm.url
rm.branch
-
+
assert len(rm.list_items(rm.module())) == 1
rm.config_reader()
rm.config_writer()
-
+
# deep traversal git / async
rsmsp = [sm.path for sm in rm.traverse()]
assert len(rsmsp) == 1 # gitdb only - its not yet uptodate so it has no submodule
-
+
# cannot set the parent commit as root module's path didn't exist
self.failUnlessRaises(ValueError, rm.set_parent_commit, 'HEAD')
-
+
# TEST UPDATE
#############
# setup commit which remove existing, add new and modify existing submodules
rm = RootModule(rwrepo)
assert len(rm.children()) == 1
-
+
# modify path without modifying the index entry
# ( which is what the move method would do properly )
#==================================================
@@ -450,37 +452,37 @@ class TestSubmodule(TestObjectBase):
fp = join_path_native(pp, sm.path)
prep = sm.path
assert not sm.module_exists() # was never updated after rwrepo's clone
-
+
# assure we clone from a local source
self._rewrite_gitdb_to_local_path(sm)
-
+
# dry-run does nothing
sm.update(recursive=False, dry_run=True, progress=prog)
assert not sm.module_exists()
-
+
sm.update(recursive=False)
assert sm.module_exists()
sm.config_writer().set_value('path', fp) # change path to something with prefix AFTER url change
-
+
# update fails as list_items in such a situations cannot work, as it cannot
# find the entry at the changed path
self.failUnlessRaises(InvalidGitRepositoryError, rm.update, recursive=False)
-
+
# move it properly - doesn't work as it its path currently points to an indexentry
# which doesn't exist ( move it to some path, it doesn't matter here )
self.failUnlessRaises(InvalidGitRepositoryError, sm.move, pp)
# reset the path(cache) to where it was, now it works
sm.path = prep
sm.move(fp, module=False) # leave it at the old location
-
+
assert not sm.module_exists()
- cpathchange = rwrepo.index.commit("changed sm path") # finally we can commit
-
+ cpathchange = rwrepo.index.commit("changed sm path") # finally we can commit
+
# update puts the module into place
rm.update(recursive=False, progress=prog)
sm.set_parent_commit(cpathchange)
assert sm.module_exists()
-
+
# add submodule
#================
nsmn = "newsubmodule"
@@ -494,17 +496,14 @@ class TestSubmodule(TestObjectBase):
# repo and a new submodule comes into life
nsm.remove(configuration=False, module=True)
assert not nsm.module_exists() and nsm.exists()
-
-
+
# dry-run does nothing
rm.update(recursive=False, dry_run=True, progress=prog)
-
+
# otherwise it will work
rm.update(recursive=False, progress=prog)
assert nsm.module_exists()
-
-
-
+
# remove submodule - the previous one
#====================================
sm.set_parent_commit(csmadded)
@@ -512,49 +511,48 @@ class TestSubmodule(TestObjectBase):
assert not sm.remove(module=False).exists()
assert os.path.isdir(smp) # module still exists
csmremoved = rwrepo.index.commit("Removed submodule")
-
+
# an update will remove the module
# not in dry_run
rm.update(recursive=False, dry_run=True)
assert os.path.isdir(smp)
-
+
rm.update(recursive=False)
assert not os.path.isdir(smp)
-
-
- # change url
+
+ # change url
#=============
- # to the first repository, this way we have a fast checkout, and a completely different
+ # to the first repository, this way we have a fast checkout, and a completely different
# repository at the different url
nsm.set_parent_commit(csmremoved)
nsmurl = os.environ.get(self.env_gitdb_local_path, self.k_github_gitdb_url)
-
- # Note: We would have liked to have a different url, but we cannot
+
+ # Note: We would have liked to have a different url, but we cannot
# provoke this case
assert nsm.url != nsmurl
nsm.config_writer().set_value('url', nsmurl)
csmpathchange = rwrepo.index.commit("changed url")
nsm.set_parent_commit(csmpathchange)
-
+
prev_commit = nsm.module().head.commit
# dry-run does nothing
rm.update(recursive=False, dry_run=True, progress=prog)
assert nsm.module().remotes.origin.url != nsmurl
-
+
rm.update(recursive=False, progress=prog)
assert nsm.module().remotes.origin.url == nsmurl
# head changed, as the remote url and its commit changed
assert prev_commit != nsm.module().head.commit
-
+
# add the submodule's changed commit to the index, which is what the
# user would do
# beforehand, update our instance's binsha with the new one
nsm.binsha = nsm.module().head.commit.binsha
rwrepo.index.add([nsm])
-
+
# change branch
#=================
- # we only have one branch, so we switch to a virtual one, and back
+ # we only have one branch, so we switch to a virtual one, and back
# to the current one to trigger the difference
cur_branch = nsm.branch
nsmm = nsm.module()
@@ -564,33 +562,32 @@ class TestSubmodule(TestObjectBase):
csmbranchchange = rwrepo.index.commit("changed branch to %s" % branch)
nsm.set_parent_commit(csmbranchchange)
# END for each branch to change
-
+
# Lets remove our tracking branch to simulate some changes
nsmmh = nsmm.head
assert nsmmh.ref.tracking_branch() is None # never set it up until now
assert not nsmmh.is_detached
-
- #dry run does nothing
+
+ # dry run does nothing
rm.update(recursive=False, dry_run=True, progress=prog)
assert nsmmh.ref.tracking_branch() is None
-
+
# the real thing does
rm.update(recursive=False, progress=prog)
-
+
assert nsmmh.ref.tracking_branch() is not None
assert not nsmmh.is_detached
-
+
# recursive update
# =================
# finally we recursively update a module, just to run the code at least once
# remove the module so that it has more work
- assert len(nsm.children()) >= 1 # could include smmap
+ assert len(nsm.children()) >= 1 # could include smmap
assert nsm.exists() and nsm.module_exists() and len(nsm.children()) >= 1
# assure we pull locally only
- nsmc = nsm.children()[0]
+ nsmc = nsm.children()[0]
nsmc.config_writer().set_value('url', async_url)
rm.update(recursive=True, progress=prog, dry_run=True) # just to run the code
rm.update(recursive=True, progress=prog)
-
+
assert len(nsm.children()) >= 2 and nsmc.module_exists()
-
diff --git a/git/test/objects/test_tree.py b/git/test/objects/test_tree.py
index 6317f4db..00cca5e4 100644
--- a/git/test/objects/test_tree.py
+++ b/git/test/objects/test_tree.py
@@ -7,16 +7,17 @@
from lib import *
from git.objects.fun import (
- traverse_tree_recursive,
- traverse_trees_recursive
- )
+ traverse_tree_recursive,
+ traverse_trees_recursive
+)
from git.objects.blob import Blob
from git.objects.tree import Tree
from cStringIO import StringIO
import os
+
class TestTree(TestObjectBase):
-
+
def test_serializable(self):
# tree at the given commit contains a submodule as well
roottree = self.rorepo.tree('6c1faef799095f3990e9970bc2cb10aa0221cf9c')
@@ -27,75 +28,74 @@ class TestTree(TestObjectBase):
tree = item
# trees have no dict
self.failUnlessRaises(AttributeError, setattr, tree, 'someattr', 1)
-
+
orig_data = tree.data_stream.read()
orig_cache = tree._cache
-
+
stream = StringIO()
tree._serialize(stream)
assert stream.getvalue() == orig_data
-
+
stream.seek(0)
testtree = Tree(self.rorepo, Tree.NULL_BIN_SHA, 0, '')
testtree._deserialize(stream)
assert testtree._cache == orig_cache
-
-
+
# TEST CACHE MUTATOR
mod = testtree.cache
self.failUnlessRaises(ValueError, mod.add, "invalid sha", 0, "name")
self.failUnlessRaises(ValueError, mod.add, Tree.NULL_HEX_SHA, 0, "invalid mode")
self.failUnlessRaises(ValueError, mod.add, Tree.NULL_HEX_SHA, tree.mode, "invalid/name")
-
+
# add new item
name = "fake_dir"
mod.add(testtree.NULL_HEX_SHA, tree.mode, name)
assert name in testtree
-
+
# its available in the tree immediately
assert isinstance(testtree[name], Tree)
-
+
# adding it again will not cause multiple of them to be presents
cur_count = len(testtree)
mod.add(testtree.NULL_HEX_SHA, tree.mode, name)
assert len(testtree) == cur_count
-
+
# fails with a different sha - name exists
- hexsha = "1"*40
+ hexsha = "1" * 40
self.failUnlessRaises(ValueError, mod.add, hexsha, tree.mode, name)
-
+
# force it - replace existing one
mod.add(hexsha, tree.mode, name, force=True)
assert testtree[name].hexsha == hexsha
assert len(testtree) == cur_count
-
+
# unchecked addition always works, even with invalid items
invalid_name = "hi/there"
mod.add_unchecked(hexsha, 0, invalid_name)
assert len(testtree) == cur_count + 1
-
+
del(mod[invalid_name])
assert len(testtree) == cur_count
# del again, its fine
del(mod[invalid_name])
-
+
# have added one item, we are done
mod.set_done()
mod.set_done() # multiple times are okay
-
+
# serialize, its different now
stream = StringIO()
testtree._serialize(stream)
stream.seek(0)
assert stream.getvalue() != orig_data
-
+
# replaces cache, but we make sure of it
del(testtree._cache)
testtree._deserialize(stream)
assert name in testtree
assert invalid_name not in testtree
# END for each item in tree
-
+
def test_traverse(self):
root = self.rorepo.tree('0.1.6')
num_recursive = 0
@@ -103,34 +103,34 @@ class TestTree(TestObjectBase):
for obj in root.traverse():
if "/" in obj.path:
num_recursive += 1
-
+
assert isinstance(obj, (Blob, Tree))
all_items.append(obj)
# END for each object
assert all_items == root.list_traverse()
-
+
# limit recursion level to 0 - should be same as default iteration
assert all_items
assert 'CHANGES' in root
assert len(list(root)) == len(list(root.traverse(depth=1)))
-
+
# only choose trees
- trees_only = lambda i,d: i.type == "tree"
- trees = list(root.traverse(predicate = trees_only))
- assert len(trees) == len(list( i for i in root.traverse() if trees_only(i,0) ))
-
+ trees_only = lambda i, d: i.type == "tree"
+ trees = list(root.traverse(predicate=trees_only))
+ assert len(trees) == len(list(i for i in root.traverse() if trees_only(i, 0)))
+
# test prune
- lib_folder = lambda t,d: t.path == "lib"
- pruned_trees = list(root.traverse(predicate = trees_only,prune = lib_folder))
+ lib_folder = lambda t, d: t.path == "lib"
+ pruned_trees = list(root.traverse(predicate=trees_only, prune=lib_folder))
assert len(pruned_trees) < len(trees)
-
+
# trees and blobs
- assert len(set(trees)|set(root.trees)) == len(trees)
- assert len(set(b for b in root if isinstance(b, Blob)) | set(root.blobs)) == len( root.blobs )
+ assert len(set(trees) | set(root.trees)) == len(trees)
+ assert len(set(b for b in root if isinstance(b, Blob)) | set(root.blobs)) == len(root.blobs)
subitem = trees[0][0]
assert "/" in subitem.path
assert subitem.name == os.path.basename(subitem.path)
-
+
# assure that at some point the traversed paths have a slash in them
found_slash = False
for item in root.traverse():
@@ -138,9 +138,8 @@ class TestTree(TestObjectBase):
if '/' in item.path:
found_slash = True
# END check for slash
-
- # slashes in paths are supported as well
- assert root[item.path] == item == root/item.path
+
+ # slashes in paths are supported as well
+ assert root[item.path] == item == root / item.path
# END for each item
assert found_slash
-
diff --git a/git/test/performance/db/__init__.py b/git/test/performance/db/__init__.py
index 8b137891..e69de29b 100644
--- a/git/test/performance/db/__init__.py
+++ b/git/test/performance/db/__init__.py
@@ -1 +0,0 @@
-
diff --git a/git/test/performance/db/looseodb_impl.py b/git/test/performance/db/looseodb_impl.py
index 1da69945..6cdbaa32 100644
--- a/git/test/performance/db/looseodb_impl.py
+++ b/git/test/performance/db/looseodb_impl.py
@@ -4,9 +4,9 @@ from git.base import *
from git.stream import *
from async import ChannelThreadTask
from git.util import (
- pool,
- bin_to_hex
- )
+ pool,
+ bin_to_hex
+)
import os
import sys
from time import time
@@ -15,7 +15,7 @@ from git.test.lib import (
GlobalsItemDeletorMetaCls,
make_memory_file,
with_rw_repo
- )
+)
from git.test.performance.lib import TestBigRepoR
@@ -32,16 +32,18 @@ def read_chunked_stream(stream):
# END read stream loop
assert total == stream.size
return stream
-
-
+
+
class TestStreamReader(ChannelThreadTask):
+
"""Expects input streams and reads them in chunks. It will read one at a time,
requireing a queue chunk of size 1"""
+
def __init__(self, *args):
super(TestStreamReader, self).__init__(*args)
self.fun = read_chunked_stream
self.max_chunksize = 1
-
+
#} END utilities
@@ -51,29 +53,29 @@ class PerfBaseDeletorMetaClass(GlobalsItemDeletorMetaCls):
class TestLooseDBWPerformanceBase(TestBigRepoR):
__metaclass__ = PerfBaseDeletorMetaClass
-
- large_data_size_bytes = 1000*1000*10 # some MiB should do it
- moderate_data_size_bytes = 1000*1000*1 # just 1 MiB
-
+
+ large_data_size_bytes = 1000 * 1000 * 10 # some MiB should do it
+ moderate_data_size_bytes = 1000 * 1000 * 1 # just 1 MiB
+
#{ Configuration
LooseODBCls = None
#} END configuration
-
+
@classmethod
def setUp(cls):
super(TestLooseDBWPerformanceBase, cls).setUp()
if cls.LooseODBCls is None:
raise AssertionError("LooseODBCls must be set in subtype")
- #END assert configuration
+ # END assert configuration
# currently there is no additional configuration
-
+
@with_rw_repo("HEAD")
def test_large_data_streaming(self, rwrepo):
# TODO: This part overlaps with the same file in git.test.performance.test_stream
# It should be shared if possible
objects_path = rwrepo.db_path('')
ldb = self.LooseODBCls(objects_path)
-
+
for randomize in range(2):
desc = (randomize and 'random ') or ''
print >> sys.stderr, "Creating %s data ..." % desc
@@ -81,8 +83,8 @@ class TestLooseDBWPerformanceBase(TestBigRepoR):
size, stream = make_memory_file(self.large_data_size_bytes, randomize)
elapsed = time() - st
print >> sys.stderr, "Done (in %f s)" % elapsed
-
- # writing - due to the compression it will seem faster than it is
+
+ # writing - due to the compression it will seem faster than it is
st = time()
binsha = ldb.store(IStream('blob', size, stream)).binsha
elapsed_add = time() - st
@@ -90,24 +92,24 @@ class TestLooseDBWPerformanceBase(TestBigRepoR):
hexsha = bin_to_hex(binsha)
db_file = os.path.join(objects_path, hexsha[:2], hexsha[2:])
fsize_kib = os.path.getsize(db_file) / 1000
-
-
+
size_kib = size / 1000
- print >> sys.stderr, "%s: Added %i KiB (filesize = %i KiB) of %s data to loose odb in %f s ( %f Write KiB / s)" % (self.LooseODBCls.__name__, size_kib, fsize_kib, desc, elapsed_add, size_kib / elapsed_add)
-
+ print >> sys.stderr, "%s: Added %i KiB (filesize = %i KiB) of %s data to loose odb in %f s ( %f Write KiB / s)" % (
+ self.LooseODBCls.__name__, size_kib, fsize_kib, desc, elapsed_add, size_kib / elapsed_add)
+
# reading all at once
st = time()
ostream = ldb.stream(binsha)
shadata = ostream.read()
elapsed_readall = time() - st
-
+
stream.seek(0)
assert shadata == stream.getvalue()
- print >> sys.stderr, "%s: Read %i KiB of %s data at once from loose odb in %f s ( %f Read KiB / s)" % (self.LooseODBCls.__name__, size_kib, desc, elapsed_readall, size_kib / elapsed_readall)
-
-
+ print >> sys.stderr, "%s: Read %i KiB of %s data at once from loose odb in %f s ( %f Read KiB / s)" % (
+ self.LooseODBCls.__name__, size_kib, desc, elapsed_readall, size_kib / elapsed_readall)
+
# reading in chunks of 1 MiB
- cs = 512*1000
+ cs = 512 * 1000
chunks = list()
st = time()
ostream = ldb.stream(binsha)
@@ -118,15 +120,14 @@ class TestLooseDBWPerformanceBase(TestBigRepoR):
break
# END read in chunks
elapsed_readchunks = time() - st
-
+
stream.seek(0)
assert ''.join(chunks) == stream.getvalue()
-
+
cs_kib = cs / 1000
- print >> sys.stderr, "%s: Read %i KiB of %s data in %i KiB chunks from loose odb in %f s ( %f Read KiB / s)" % (self.LooseODBCls.__name__, size_kib, desc, cs_kib, elapsed_readchunks, size_kib / elapsed_readchunks)
-
+ print >> sys.stderr, "%s: Read %i KiB of %s data in %i KiB chunks from loose odb in %f s ( %f Read KiB / s)" % (
+ self.LooseODBCls.__name__, size_kib, desc, cs_kib, elapsed_readchunks, size_kib / elapsed_readchunks)
+
# del db file so git has something to do
os.remove(db_file)
# END for each randomization factor
-
-
diff --git a/git/test/performance/db/odb_impl.py b/git/test/performance/db/odb_impl.py
index 887604c0..afe9a32b 100644
--- a/git/test/performance/db/odb_impl.py
+++ b/git/test/performance/db/odb_impl.py
@@ -7,31 +7,33 @@ import stat
from git.test.performance.lib import (
TestBigRepoR,
GlobalsItemDeletorMetaCls
- )
+)
+
class PerfBaseDeletorMetaClass(GlobalsItemDeletorMetaCls):
ModuleToDelete = 'TestObjDBPerformanceBase'
-
+
class TestObjDBPerformanceBase(TestBigRepoR):
__metaclass__ = PerfBaseDeletorMetaClass
-
- #{ Configuration
+
+ #{ Configuration
RepoCls = None # to be set by subclass
#} END configuration
-
+
def test_random_access_test(self):
repo = self.rorepo
-
+
# GET COMMITS
st = time()
root_commit = repo.commit(self.head_sha_2k)
commits = list(root_commit.traverse())
nc = len(commits)
elapsed = time() - st
-
- print >> sys.stderr, "%s: Retrieved %i commits from ObjectStore in %g s ( %f commits / s )" % (type(repo.odb), nc, elapsed, nc / elapsed)
-
+
+ print >> sys.stderr, "%s: Retrieved %i commits from ObjectStore in %g s ( %f commits / s )" % (
+ type(repo.odb), nc, elapsed, nc / elapsed)
+
# GET TREES
# walk all trees of all commits
st = time()
@@ -49,9 +51,10 @@ class TestObjDBPerformanceBase(TestBigRepoR):
blobs_per_commit.append(blobs)
# END for each commit
elapsed = time() - st
-
- print >> sys.stderr, "%s: Retrieved %i objects from %i commits in %g s ( %f objects / s )" % (type(repo.odb), nt, len(commits), elapsed, nt / elapsed)
-
+
+ print >> sys.stderr, "%s: Retrieved %i objects from %i commits in %g s ( %f objects / s )" % (
+ type(repo.odb), nt, len(commits), elapsed, nt / elapsed)
+
# GET BLOBS
st = time()
nb = 0
@@ -66,7 +69,6 @@ class TestObjDBPerformanceBase(TestBigRepoR):
break
# END for each bloblist
elapsed = time() - st
-
- print >> sys.stderr, "%s: Retrieved %i blob (%i KiB) and their data in %g s ( %f blobs / s, %f KiB / s )" % (type(repo.odb), nb, data_bytes/1000, elapsed, nb / elapsed, (data_bytes / 1000) / elapsed)
-
-
+
+ print >> sys.stderr, "%s: Retrieved %i blob (%i KiB) and their data in %g s ( %f blobs / s, %f KiB / s )" % (
+ type(repo.odb), nb, data_bytes / 1000, elapsed, nb / elapsed, (data_bytes / 1000) / elapsed)
diff --git a/git/test/performance/db/packedodb_impl.py b/git/test/performance/db/packedodb_impl.py
index 23d00444..2aaf99a2 100644
--- a/git/test/performance/db/packedodb_impl.py
+++ b/git/test/performance/db/packedodb_impl.py
@@ -4,9 +4,9 @@
# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
"""Performance tests for object store"""
from git.test.performance.lib import (
- TestBigRepoR,
+ TestBigRepoR,
GlobalsItemDeletorMetaCls
- )
+)
from git.exc import UnsupportedOperation
@@ -19,31 +19,32 @@ import random
class PerfBaseDeletorMetaClass(GlobalsItemDeletorMetaCls):
ModuleToDelete = 'TestPurePackedODBPerformanceBase'
+
class TestPurePackedODBPerformanceBase(TestBigRepoR):
__metaclass__ = PerfBaseDeletorMetaClass
-
+
#{ Configuration
PackedODBCls = None
#} END configuration
-
+
@classmethod
def setUp(cls):
super(TestPurePackedODBPerformanceBase, cls).setUp()
if cls.PackedODBCls is None:
raise AssertionError("PackedODBCls must be set in subclass")
- #END assert configuration
+ # END assert configuration
cls.ropdb = cls.PackedODBCls(cls.rorepo.db_path("pack"))
-
+
def test_pack_random_access(self):
pdb = self.ropdb
-
+
# sha lookup
st = time()
sha_list = list(pdb.sha_iter())
elapsed = time() - st
ns = len(sha_list)
print >> sys.stderr, "PDB: looked up %i shas by index in %f s ( %f shas/s )" % (ns, elapsed, ns / elapsed)
-
+
# sha lookup: best-case and worst case access
pdb_pack_info = pdb._pack_info
# END shuffle shas
@@ -52,13 +53,14 @@ class TestPurePackedODBPerformanceBase(TestBigRepoR):
pdb_pack_info(sha)
# END for each sha to look up
elapsed = time() - st
-
+
# discard cache
del(pdb._entities)
pdb.entities()
- print >> sys.stderr, "PDB: looked up %i sha in %i packs in %f s ( %f shas/s )" % (ns, len(pdb.entities()), elapsed, ns / elapsed)
+ print >> sys.stderr, "PDB: looked up %i sha in %i packs in %f s ( %f shas/s )" % (
+ ns, len(pdb.entities()), elapsed, ns / elapsed)
# END for each random mode
-
+
# query info and streams only
max_items = 10000 # can wait longer when testing memory
for pdb_fun in (pdb.info, pdb.stream):
@@ -66,9 +68,10 @@ class TestPurePackedODBPerformanceBase(TestBigRepoR):
for sha in sha_list[:max_items]:
pdb_fun(sha)
elapsed = time() - st
- print >> sys.stderr, "PDB: Obtained %i object %s by sha in %f s ( %f items/s )" % (max_items, pdb_fun.__name__.upper(), elapsed, max_items / elapsed)
+ print >> sys.stderr, "PDB: Obtained %i object %s by sha in %f s ( %f items/s )" % (
+ max_items, pdb_fun.__name__.upper(), elapsed, max_items / elapsed)
# END for each function
-
+
# retrieve stream and read all
max_items = 5000
pdb_stream = pdb.stream
@@ -80,8 +83,9 @@ class TestPurePackedODBPerformanceBase(TestBigRepoR):
total_size += stream.size
elapsed = time() - st
total_kib = total_size / 1000
- print >> sys.stderr, "PDB: Obtained %i streams by sha and read all bytes totallying %i KiB ( %f KiB / s ) in %f s ( %f streams/s )" % (max_items, total_kib, total_kib/elapsed , elapsed, max_items / elapsed)
-
+ print >> sys.stderr, "PDB: Obtained %i streams by sha and read all bytes totallying %i KiB ( %f KiB / s ) in %f s ( %f streams/s )" % (
+ max_items, total_kib, total_kib / elapsed, elapsed, max_items / elapsed)
+
def test_correctness(self):
pdb = self.ropdb
# disabled for now as it used to work perfectly, checking big repositories takes a long time
@@ -102,6 +106,6 @@ class TestPurePackedODBPerformanceBase(TestBigRepoR):
# END for each index
# END for each entity
elapsed = time() - st
- print >> sys.stderr, "PDB: verified %i objects (crc=%i) in %f s ( %f objects/s )" % (count, crc, elapsed, count / elapsed)
+ print >> sys.stderr, "PDB: verified %i objects (crc=%i) in %f s ( %f objects/s )" % (
+ count, crc, elapsed, count / elapsed)
# END for each verify mode
-
diff --git a/git/test/performance/db/test_looseodb_cmd.py b/git/test/performance/db/test_looseodb_cmd.py
index 9147eff6..f96e4c3e 100644
--- a/git/test/performance/db/test_looseodb_cmd.py
+++ b/git/test/performance/db/test_looseodb_cmd.py
@@ -3,9 +3,10 @@ from looseodb_impl import TestLooseDBWPerformanceBase
import sys
+
class TestCmdLooseDB(TestLooseDBWPerformanceBase):
LooseODBCls = CmdCompatibilityGitDB
-
+
def test_info(self):
- sys.stderr.write("This test does not check the write performance of the git command as it is implemented in pure python")
-
+ sys.stderr.write(
+ "This test does not check the write performance of the git command as it is implemented in pure python")
diff --git a/git/test/performance/db/test_looseodb_dulwich.py b/git/test/performance/db/test_looseodb_dulwich.py
index 174be83d..e23327f7 100644
--- a/git/test/performance/db/test_looseodb_dulwich.py
+++ b/git/test/performance/db/test_looseodb_dulwich.py
@@ -2,12 +2,12 @@ try:
from git.db.dulwich.complex import DulwichGitODB
except ImportError:
from git.db.py.complex import PureGitODB as DulwichGitODB
-#END handle import
+# END handle import
from git.test.db.dulwich.lib import DulwichRequiredMetaMixin
from looseodb_impl import TestLooseDBWPerformanceBase
+
class TestPureLooseDB(TestLooseDBWPerformanceBase):
__metaclass__ = DulwichRequiredMetaMixin
LooseODBCls = DulwichGitODB
-
diff --git a/git/test/performance/db/test_looseodb_pure.py b/git/test/performance/db/test_looseodb_pure.py
index bb080612..bc4b54fe 100644
--- a/git/test/performance/db/test_looseodb_pure.py
+++ b/git/test/performance/db/test_looseodb_pure.py
@@ -1,6 +1,6 @@
from git.db.py.loose import PureLooseObjectODB
from looseodb_impl import TestLooseDBWPerformanceBase
+
class TestPureLooseDB(TestLooseDBWPerformanceBase):
LooseODBCls = PureLooseObjectODB
-
diff --git a/git/test/performance/db/test_looseodb_pygit2.py b/git/test/performance/db/test_looseodb_pygit2.py
index a9661111..06ece5c7 100644
--- a/git/test/performance/db/test_looseodb_pygit2.py
+++ b/git/test/performance/db/test_looseodb_pygit2.py
@@ -2,12 +2,12 @@ try:
from git.db.pygit2.complex import Pygit2GitODB
except ImportError:
from git.db.py.complex import PureGitODB as Pygit2GitODB
-#END handle import
+# END handle import
from git.test.db.pygit2.lib import Pygit2RequiredMetaMixin
from looseodb_impl import TestLooseDBWPerformanceBase
+
class TestPureLooseDB(TestLooseDBWPerformanceBase):
__metaclass__ = Pygit2RequiredMetaMixin
LooseODBCls = Pygit2GitODB
-
diff --git a/git/test/performance/db/test_odb_cmd.py b/git/test/performance/db/test_odb_cmd.py
index 37af34fd..a7dcfb0d 100644
--- a/git/test/performance/db/test_odb_cmd.py
+++ b/git/test/performance/db/test_odb_cmd.py
@@ -1,6 +1,6 @@
from git.db.complex import CmdCompatibilityGitDB
from odb_impl import TestObjDBPerformanceBase
+
class TestCmdDB(TestObjDBPerformanceBase):
RepoCls = CmdCompatibilityGitDB
-
diff --git a/git/test/performance/db/test_odb_dulwich.py b/git/test/performance/db/test_odb_dulwich.py
index 33abc88c..a5b8e57c 100644
--- a/git/test/performance/db/test_odb_dulwich.py
+++ b/git/test/performance/db/test_odb_dulwich.py
@@ -2,12 +2,12 @@ try:
from git.db.dulwich.complex import DulwichCompatibilityGitDB
except ImportError:
from git.db.complex import PureCompatibilityGitDB as DulwichCompatibilityGitDB
-#END handle dulwich compatibility
+# END handle dulwich compatibility
from git.test.db.dulwich.lib import DulwichRequiredMetaMixin
from odb_impl import TestObjDBPerformanceBase
+
class TestDulwichDB(TestObjDBPerformanceBase):
__metaclass__ = DulwichRequiredMetaMixin
RepoCls = DulwichCompatibilityGitDB
-
diff --git a/git/test/performance/db/test_odb_pure.py b/git/test/performance/db/test_odb_pure.py
index 93139c57..48c42659 100644
--- a/git/test/performance/db/test_odb_pure.py
+++ b/git/test/performance/db/test_odb_pure.py
@@ -1,6 +1,6 @@
from git.db.complex import PureCompatibilityGitDB
from odb_impl import TestObjDBPerformanceBase
+
class TestPureDB(TestObjDBPerformanceBase):
RepoCls = PureCompatibilityGitDB
-
diff --git a/git/test/performance/db/test_odb_pygit2.py b/git/test/performance/db/test_odb_pygit2.py
index c5911ae3..f44bfac8 100644
--- a/git/test/performance/db/test_odb_pygit2.py
+++ b/git/test/performance/db/test_odb_pygit2.py
@@ -2,12 +2,12 @@ try:
from git.db.pygit2.complex import Pygit2CompatibilityGitDB
except ImportError:
from git.db.complex import PureCompatibilityGitDB as Pygit2CompatibilityGitDB
-#END handle pygit2 compatibility
+# END handle pygit2 compatibility
from git.test.db.pygit2.lib import Pygit2RequiredMetaMixin
from odb_impl import TestObjDBPerformanceBase
+
class TestPygit2DB(TestObjDBPerformanceBase):
__metaclass__ = Pygit2RequiredMetaMixin
RepoCls = Pygit2CompatibilityGitDB
-
diff --git a/git/test/performance/db/test_packedodb_pure.py b/git/test/performance/db/test_packedodb_pure.py
index 90e8381f..94099b83 100644
--- a/git/test/performance/db/test_packedodb_pure.py
+++ b/git/test/performance/db/test_packedodb_pure.py
@@ -18,25 +18,27 @@ from nose import SkipTest
class CountedNullStream(NullStream):
__slots__ = '_bw'
+
def __init__(self):
self._bw = 0
-
+
def bytes_written(self):
return self._bw
-
+
def write(self, d):
self._bw += NullStream.write(self, d)
-
+
class TestPurePackedODB(TestPurePackedODBPerformanceBase):
#{ Configuration
PackedODBCls = PurePackedODB
#} END configuration
-
+
def test_pack_writing_note(self):
- sys.stderr.write("test_pack_writing should be adjusted to support different databases to read from - see test for more info")
+ sys.stderr.write(
+ "test_pack_writing should be adjusted to support different databases to read from - see test for more info")
raise SkipTest()
-
+
def test_pack_writing(self):
# see how fast we can write a pack from object streams.
# This will not be fast, as we take time for decompressing the streams as well
@@ -44,7 +46,7 @@ class TestPurePackedODB(TestPurePackedODBPerformanceBase):
ostream = CountedNullStream()
# NOTE: We use the same repo twice to see whether OS caching helps
for rorepo in (self.rorepo, self.rorepo, self.ropdb):
-
+
ni = 5000
count = 0
total_size = 0
@@ -54,22 +56,23 @@ class TestPurePackedODB(TestPurePackedODBPerformanceBase):
rorepo.stream(sha)
if count == ni:
break
- #END gather objects for pack-writing
+ # END gather objects for pack-writing
elapsed = time() - st
- print >> sys.stderr, "PDB Streaming: Got %i streams from %s by sha in in %f s ( %f streams/s )" % (count, rorepo.__class__.__name__, elapsed, count / elapsed)
-
+ print >> sys.stderr, "PDB Streaming: Got %i streams from %s by sha in in %f s ( %f streams/s )" % (
+ count, rorepo.__class__.__name__, elapsed, count / elapsed)
+
st = time()
PackEntity.write_pack((rorepo.stream(sha) for sha in rorepo.sha_iter()), ostream.write, object_count=ni)
elapsed = time() - st
total_kb = ostream.bytes_written() / 1000
- print >> sys.stderr, "PDB Streaming: Wrote pack of size %i kb in %f s (%f kb/s)" % (total_kb, elapsed, total_kb/elapsed)
- #END for each rorepo
-
-
+ print >> sys.stderr, "PDB Streaming: Wrote pack of size %i kb in %f s (%f kb/s)" % (
+ total_kb, elapsed, total_kb / elapsed)
+ # END for each rorepo
+
def test_stream_reading(self):
raise SkipTest("This test was only used for --with-profile runs")
pdb = self.ropdb
-
+
# streaming only, meant for --with-profile runs
ni = 5000
count = 0
@@ -85,5 +88,5 @@ class TestPurePackedODB(TestPurePackedODBPerformanceBase):
count += 1
elapsed = time() - st
total_kib = total_size / 1000
- print >> sys.stderr, "PDB Streaming: Got %i streams by sha and read all bytes totallying %i KiB ( %f KiB / s ) in %f s ( %f streams/s )" % (ni, total_kib, total_kib/elapsed , elapsed, ni / elapsed)
-
+ print >> sys.stderr, "PDB Streaming: Got %i streams by sha and read all bytes totallying %i KiB ( %f KiB / s ) in %f s ( %f streams/s )" % (
+ ni, total_kib, total_kib / elapsed, elapsed, ni / elapsed)
diff --git a/git/test/performance/lib.py b/git/test/performance/lib.py
index 2772fd7d..d01ef37e 100644
--- a/git/test/performance/lib.py
+++ b/git/test/performance/lib.py
@@ -1,9 +1,9 @@
"""Contains library functions"""
import os
from git.test.lib import (
- TestBase,
- GlobalsItemDeletorMetaCls
- )
+ TestBase,
+ GlobalsItemDeletorMetaCls
+)
import shutil
import tempfile
@@ -26,49 +26,51 @@ def resolve_or_fail(env_var):
#} END utilities
-#{ Base Classes
+#{ Base Classes
class TestBigRepoR(TestBase):
+
"""TestCase providing access to readonly 'big' repositories using the following
member variables:
-
+
* gitrorepo
-
+
* a big read-only git repository
"""
-
+
#{ Invariants
head_sha_2k = '235d521da60e4699e5bd59ac658b5b48bd76ddca'
head_sha_50 = '32347c375250fd470973a5d76185cac718955fd5'
- #} END invariants
-
+ #} END invariants
+
#{ Configuration
RepoCls = Repo
#} END configuration
-
+
@classmethod
def setUp(cls):
super(TestBigRepoR, cls).setUp()
if cls.RepoCls is None:
raise AssertionError("Require RepoCls in class %s to be set" % cls)
- #END assert configuration
+ # END assert configuration
cls.rorepo = cls.RepoCls(resolve_or_fail(k_env_git_repo))
class TestBigRepoRW(TestBigRepoR):
+
"""As above, but provides a big repository that we can write to.
-
+
Provides ``self.rwrepo``"""
-
+
@classmethod
def setUp(cls):
super(TestBigRepoRW, cls).setUp()
dirname = tempfile.mktemp()
os.mkdir(dirname)
cls.rwrepo = cls.rorepo.clone(dirname, shared=True, bare=True)
-
+
@classmethod
def tearDownAll(cls):
shutil.rmtree(cls.rwrepo.working_dir)
-
+
#} END base classes
diff --git a/git/test/performance/objects/__init__.py b/git/test/performance/objects/__init__.py
index 8b137891..e69de29b 100644
--- a/git/test/performance/objects/__init__.py
+++ b/git/test/performance/objects/__init__.py
@@ -1 +0,0 @@
-
diff --git a/git/test/performance/objects/test_commit.py b/git/test/performance/objects/test_commit.py
index e342e6b3..cd8866d3 100644
--- a/git/test/performance/objects/test_commit.py
+++ b/git/test/performance/objects/test_commit.py
@@ -12,8 +12,9 @@ from cStringIO import StringIO
from time import time
import sys
+
class TestPerformance(TestBigRepoRW):
-
+
# ref with about 100 commits in its history
ref_100 = 'v0.99'
@@ -26,15 +27,15 @@ class TestPerformance(TestBigRepoRW):
c.committer_tz_offset
c.message
c.parents
-
+
def test_iteration(self):
no = 0
nc = 0
-
- # find the first commit containing the given path - always do a full
- # iteration ( restricted to the path in question ), but in fact it should
+
+ # find the first commit containing the given path - always do a full
+ # iteration ( restricted to the path in question ), but in fact it should
# return quite a lot of commits, we just take one and hence abort the operation
-
+
st = time()
for c in self.rorepo.iter_commits(self.ref_100):
nc += 1
@@ -46,8 +47,9 @@ class TestPerformance(TestBigRepoRW):
# END for each commit
elapsed_time = time() - st
assert no, "Should have traversed a few objects"
- print >> sys.stderr, "Traversed %i Trees and a total of %i unchached objects in %s [s] ( %f objs/s )" % (nc, no, elapsed_time, no/elapsed_time)
-
+ print >> sys.stderr, "Traversed %i Trees and a total of %i unchached objects in %s [s] ( %f objs/s )" % (
+ nc, no, elapsed_time, no / elapsed_time)
+
def test_commit_traversal(self):
# bound to cat-file parsing performance
nc = 0
@@ -57,8 +59,8 @@ class TestPerformance(TestBigRepoRW):
self._query_commit_info(c)
# END for each traversed commit
elapsed_time = time() - st
- print >> sys.stderr, "Traversed %i Commits in %s [s] ( %f commits/s )" % (nc, elapsed_time, nc/elapsed_time)
-
+ print >> sys.stderr, "Traversed %i Commits in %s [s] ( %f commits/s )" % (nc, elapsed_time, nc / elapsed_time)
+
def test_commit_iteration(self):
# bound to stream parsing performance
nc = 0
@@ -68,33 +70,34 @@ class TestPerformance(TestBigRepoRW):
self._query_commit_info(c)
# END for each traversed commit
elapsed_time = time() - st
- print >> sys.stderr, "Iterated %i Commits in %s [s] ( %f commits/s )" % (nc, elapsed_time, nc/elapsed_time)
-
+ print >> sys.stderr, "Iterated %i Commits in %s [s] ( %f commits/s )" % (nc, elapsed_time, nc / elapsed_time)
+
def test_commit_serialization(self):
assert_commit_serialization(self.rwrepo, self.head_sha_2k, True)
-
+
rwrepo = self.rwrepo
make_object = rwrepo.store
# direct serialization - deserialization can be tested afterwards
# serialization is probably limited on IO
hc = rwrepo.commit(self.head_sha_2k)
-
+
commits = list()
nc = 5000
st = time()
for i in xrange(nc):
- cm = Commit( rwrepo, Commit.NULL_BIN_SHA, hc.tree,
- hc.author, hc.authored_date, hc.author_tz_offset,
- hc.committer, hc.committed_date, hc.committer_tz_offset,
- str(i), parents=hc.parents, encoding=hc.encoding)
-
+ cm = Commit(rwrepo, Commit.NULL_BIN_SHA, hc.tree,
+ hc.author, hc.authored_date, hc.author_tz_offset,
+ hc.committer, hc.committed_date, hc.committer_tz_offset,
+ str(i), parents=hc.parents, encoding=hc.encoding)
+
stream = StringIO()
cm._serialize(stream)
slen = stream.tell()
stream.seek(0)
-
+
cm.binsha = make_object(IStream(Commit.type, slen, stream)).binsha
# END commit creation
elapsed = time() - st
-
- print >> sys.stderr, "Serialized %i commits to loose objects in %f s ( %f commits / s )" % (nc, elapsed, nc / elapsed)
+
+ print >> sys.stderr, "Serialized %i commits to loose objects in %f s ( %f commits / s )" % (
+ nc, elapsed, nc / elapsed)
diff --git a/git/test/performance/test_utils.py b/git/test/performance/test_utils.py
index 8637af48..7db972f7 100644
--- a/git/test/performance/test_utils.py
+++ b/git/test/performance/test_utils.py
@@ -5,33 +5,37 @@ import stat
from lib import (
TestBigRepoR
- )
+)
class TestUtilPerformance(TestBigRepoR):
-
+
def test_access(self):
# compare dict vs. slot access
class Slotty(object):
__slots__ = "attr"
+
def __init__(self):
self.attr = 1
-
+
class Dicty(object):
+
def __init__(self):
self.attr = 1
-
+
class BigSlotty(object):
__slots__ = ('attr', ) + tuple('abcdefghijk')
+
def __init__(self):
for attr in self.__slots__:
setattr(self, attr, 1)
-
+
class BigDicty(object):
+
def __init__(self):
for attr in BigSlotty.__slots__:
setattr(self, attr, 1)
-
+
ni = 1000000
for cls in (Slotty, Dicty, BigSlotty, BigDicty):
cli = cls()
@@ -40,9 +44,10 @@ class TestUtilPerformance(TestBigRepoR):
cli.attr
# END for each access
elapsed = time() - st
- print >> sys.stderr, "Accessed %s.attr %i times in %s s ( %f acc / s)" % (cls.__name__, ni, elapsed, ni / elapsed)
+ print >> sys.stderr, "Accessed %s.attr %i times in %s s ( %f acc / s)" % (
+ cls.__name__, ni, elapsed, ni / elapsed)
# END for each class type
-
+
# check num of sequence-acceses
for cls in (list, tuple):
x = 10
@@ -55,13 +60,14 @@ class TestUtilPerformance(TestBigRepoR):
# END for
elapsed = time() - st
na = ni * 3
- print >> sys.stderr, "Accessed %s[x] %i times in %s s ( %f acc / s)" % (cls.__name__, na, elapsed, na / elapsed)
- # END for each sequence
-
+ print >> sys.stderr, "Accessed %s[x] %i times in %s s ( %f acc / s)" % (
+ cls.__name__, na, elapsed, na / elapsed)
+ # END for each sequence
+
def test_instantiation(self):
ni = 100000
max_num_items = 4
- for mni in range(max_num_items+1):
+ for mni in range(max_num_items + 1):
for cls in (tuple, list):
st = time()
for i in xrange(ni):
@@ -70,71 +76,75 @@ class TestUtilPerformance(TestBigRepoR):
elif mni == 1:
cls((1,))
elif mni == 2:
- cls((1,2))
+ cls((1, 2))
elif mni == 3:
- cls((1,2,3))
+ cls((1, 2, 3))
elif mni == 4:
- cls((1,2,3,4))
+ cls((1, 2, 3, 4))
else:
cls(x for x in xrange(mni))
# END handle empty cls
# END for each item
elapsed = time() - st
- print >> sys.stderr, "Created %i %ss of size %i in %f s ( %f inst / s)" % (ni, cls.__name__, mni, elapsed, ni / elapsed)
+ print >> sys.stderr, "Created %i %ss of size %i in %f s ( %f inst / s)" % (
+ ni, cls.__name__, mni, elapsed, ni / elapsed)
# END for each type
# END for each item count
-
+
# tuple and tuple direct
st = time()
for i in xrange(ni):
- t = (1,2,3,4)
+ t = (1, 2, 3, 4)
# END for each item
elapsed = time() - st
print >> sys.stderr, "Created %i tuples (1,2,3,4) in %f s ( %f tuples / s)" % (ni, elapsed, ni / elapsed)
-
+
st = time()
for i in xrange(ni):
- t = tuple((1,2,3,4))
+ t = tuple((1, 2, 3, 4))
# END for each item
elapsed = time() - st
print >> sys.stderr, "Created %i tuples tuple((1,2,3,4)) in %f s ( %f tuples / s)" % (ni, elapsed, ni / elapsed)
-
+
def test_unpacking_vs_indexing(self):
ni = 1000000
- list_items = [1,2,3,4]
- tuple_items = (1,2,3,4)
-
+ list_items = [1, 2, 3, 4]
+ tuple_items = (1, 2, 3, 4)
+
for sequence in (list_items, tuple_items):
st = time()
for i in xrange(ni):
one, two, three, four = sequence
# END for eac iteration
elapsed = time() - st
- print >> sys.stderr, "Unpacked %i %ss of size %i in %f s ( %f acc / s)" % (ni, type(sequence).__name__, len(sequence), elapsed, ni / elapsed)
-
+ print >> sys.stderr, "Unpacked %i %ss of size %i in %f s ( %f acc / s)" % (
+ ni, type(sequence).__name__, len(sequence), elapsed, ni / elapsed)
+
st = time()
for i in xrange(ni):
one, two, three, four = sequence[0], sequence[1], sequence[2], sequence[3]
# END for eac iteration
elapsed = time() - st
- print >> sys.stderr, "Unpacked %i %ss of size %i individually in %f s ( %f acc / s)" % (ni, type(sequence).__name__, len(sequence), elapsed, ni / elapsed)
-
+ print >> sys.stderr, "Unpacked %i %ss of size %i individually in %f s ( %f acc / s)" % (
+ ni, type(sequence).__name__, len(sequence), elapsed, ni / elapsed)
+
st = time()
for i in xrange(ni):
one, two = sequence[0], sequence[1]
# END for eac iteration
elapsed = time() - st
- print >> sys.stderr, "Unpacked %i %ss of size %i individually (2 of 4) in %f s ( %f acc / s)" % (ni, type(sequence).__name__, len(sequence), elapsed, ni / elapsed)
+ print >> sys.stderr, "Unpacked %i %ss of size %i individually (2 of 4) in %f s ( %f acc / s)" % (
+ ni, type(sequence).__name__, len(sequence), elapsed, ni / elapsed)
# END for each sequence
-
+
def test_large_list_vs_iteration(self):
# what costs more: alloc/realloc of lists, or the cpu strain of iterators ?
def slow_iter(ni):
for i in xrange(ni):
yield i
# END slow iter - be closer to the real world
-
- # alloc doesn't play a role here it seems
+
+ # alloc doesn't play a role here it seems
for ni in (500, 1000, 10000, 20000, 40000):
st = time()
for i in list(xrange(ni)):
@@ -142,7 +152,7 @@ class TestUtilPerformance(TestBigRepoR):
# END for each item
elapsed = time() - st
print >> sys.stderr, "Iterated %i items from list in %f s ( %f acc / s)" % (ni, elapsed, ni / elapsed)
-
+
st = time()
for i in slow_iter(ni):
i
@@ -150,22 +160,23 @@ class TestUtilPerformance(TestBigRepoR):
elapsed = time() - st
print >> sys.stderr, "Iterated %i items from iterator in %f s ( %f acc / s)" % (ni, elapsed, ni / elapsed)
# END for each number of iterations
-
+
def test_type_vs_inst_class(self):
class NewType(object):
pass
-
+
# lets see which way is faster
inst = NewType()
-
+
ni = 1000000
st = time()
for i in xrange(ni):
inst.__class__()
# END for each item
elapsed = time() - st
- print >> sys.stderr, "Created %i items using inst.__class__ in %f s ( %f items / s)" % (ni, elapsed, ni / elapsed)
-
+ print >> sys.stderr, "Created %i items using inst.__class__ in %f s ( %f items / s)" % (
+ ni, elapsed, ni / elapsed)
+
st = time()
for i in xrange(ni):
type(inst)()
diff --git a/git/test/refs/__init__.py b/git/test/refs/__init__.py
index 8b137891..e69de29b 100644
--- a/git/test/refs/__init__.py
+++ b/git/test/refs/__init__.py
@@ -1 +0,0 @@
-
diff --git a/git/test/refs/test_reflog.py b/git/test/refs/test_reflog.py
index 2ac19de9..8a97b5ec 100644
--- a/git/test/refs/test_reflog.py
+++ b/git/test/refs/test_reflog.py
@@ -7,6 +7,7 @@ import tempfile
import shutil
import os
+
class TestRefLog(TestBase):
def test_reflogentry(self):
@@ -14,51 +15,51 @@ class TestRefLog(TestBase):
hexsha = 'F' * 40
actor = Actor('name', 'email')
msg = "message"
-
+
self.failUnlessRaises(ValueError, RefLogEntry.new, nullhexsha, hexsha, 'noactor', 0, 0, "")
e = RefLogEntry.new(nullhexsha, hexsha, actor, 0, 1, msg)
-
+
assert e.oldhexsha == nullhexsha
assert e.newhexsha == hexsha
assert e.actor == actor
assert e.time[0] == 0
assert e.time[1] == 1
assert e.message == msg
-
+
# check representation (roughly)
assert repr(e).startswith(nullhexsha)
-
+
def test_base(self):
rlp_head = fixture_path('reflog_HEAD')
rlp_master = fixture_path('reflog_master')
tdir = tempfile.mktemp(suffix="test_reflogs")
os.mkdir(tdir)
-
- rlp_master_ro = RefLog.path(self.rorepo.head)
+
+ rlp_master_ro = RefLog.path(self.rorepo.head)
assert os.path.isfile(rlp_master_ro)
-
+
# simple read
reflog = RefLog.from_file(rlp_master_ro)
assert reflog._path is not None
assert isinstance(reflog, RefLog)
assert len(reflog)
-
+
# iter_entries works with path and with stream
assert len(list(RefLog.iter_entries(open(rlp_master))))
assert len(list(RefLog.iter_entries(rlp_master)))
-
+
# raise on invalid revlog
# TODO: Try multiple corrupted ones !
pp = 'reflog_invalid_'
for suffix in ('oldsha', 'newsha', 'email', 'date', 'sep'):
- self.failUnlessRaises(ValueError, RefLog.from_file, fixture_path(pp+suffix))
- #END for each invalid file
-
+ self.failUnlessRaises(ValueError, RefLog.from_file, fixture_path(pp + suffix))
+ # END for each invalid file
+
# cannot write an uninitialized reflog
self.failUnlessRaises(ValueError, RefLog().write)
-
+
# test serialize and deserialize - results must match exactly
- binsha = chr(255)*20
+ binsha = chr(255) * 20
msg = "my reflog message"
cr = self.rorepo.config_reader()
for rlp in (rlp_head, rlp_master):
@@ -66,35 +67,34 @@ class TestRefLog(TestBase):
tfile = os.path.join(tdir, os.path.basename(rlp))
reflog.to_file(tfile)
assert reflog.write() is reflog
-
+
# parsed result must match ...
treflog = RefLog.from_file(tfile)
assert treflog == reflog
-
+
# ... as well as each bytes of the written stream
assert open(tfile).read() == open(rlp).read()
-
+
# append an entry
entry = RefLog.append_entry(cr, tfile, IndexObject.NULL_BIN_SHA, binsha, msg)
assert entry.oldhexsha == IndexObject.NULL_HEX_SHA
- assert entry.newhexsha == 'f'*40
+ assert entry.newhexsha == 'f' * 40
assert entry.message == msg
assert RefLog.from_file(tfile)[-1] == entry
-
+
# index entry
# raises on invalid index
self.failUnlessRaises(IndexError, RefLog.entry_at, rlp, 10000)
-
+
# indices can be positive ...
assert isinstance(RefLog.entry_at(rlp, 0), RefLogEntry)
RefLog.entry_at(rlp, 23)
-
+
# ... and negative
for idx in (-1, -24):
RefLog.entry_at(rlp, idx)
- #END for each index to read
- # END for each reflog
-
-
+ # END for each index to read
+ # END for each reflog
+
# finally remove our temporary data
shutil.rmtree(tdir)
diff --git a/git/test/refs/test_refs.py b/git/test/refs/test_refs.py
index d3716cc4..7213c119 100644
--- a/git/test/refs/test_refs.py
+++ b/git/test/refs/test_refs.py
@@ -18,6 +18,7 @@ import os
from nose import SkipTest
+
class TestRefs(TestBase):
def test_from_path(self):
@@ -27,14 +28,14 @@ class TestRefs(TestBase):
full_path = ref_type.to_full_path(name)
instance = ref_type.from_path(self.rorepo, full_path)
assert isinstance(instance, ref_type)
- # END for each name
+ # END for each name
# END for each type
-
+
# invalid path
self.failUnlessRaises(ValueError, TagReference, self.rorepo, "refs/invalid/tag")
# works without path check
TagReference(self.rorepo, "refs/invalid/tag", check_path=False)
-
+
def test_tag_base(self):
tag_object_refs = list()
for tag in TagReference.list_items(self.rorepo):
@@ -46,7 +47,7 @@ class TestRefs(TestBase):
tagobj = tag.tag
# have no dict
self.failUnlessRaises(AttributeError, setattr, tagobj, 'someattr', 1)
- assert isinstance(tagobj, TagObject)
+ assert isinstance(tagobj, TagObject)
assert tagobj.tag == tag.name
assert isinstance(tagobj.tagger, Actor)
assert isinstance(tagobj.tagged_date, int)
@@ -59,7 +60,7 @@ class TestRefs(TestBase):
# END for tag in repo-tags
assert tag_object_refs
assert isinstance(TagReference.list_items(self.rorepo)['0.1.6'], TagReference)
-
+
def test_tags(self):
# tag refs can point to tag objects or to commits
s = set()
@@ -74,8 +75,8 @@ class TestRefs(TestBase):
s.add(ref)
# END for each ref
assert len(s) == ref_count
- assert len(s|s) == ref_count
-
+ assert len(s | s) == ref_count
+
@with_rw_repo("0.1.6")
def test_heads(self, rw_repo):
for head in Head.iter_items(rw_repo):
@@ -86,7 +87,7 @@ class TestRefs(TestBase):
cur_object = head.object
assert prev_object == cur_object # represent the same git object
assert prev_object is not cur_object # but are different instances
-
+
writer = head.config_writer()
tv = "testopt"
writer.set_value(tv, 1)
@@ -94,7 +95,7 @@ class TestRefs(TestBase):
del(writer)
assert head.config_reader().get_value(tv) == 1
head.config_writer().remove_option(tv)
-
+
# after the clone, we might still have a tracking branch setup
head.set_tracking_branch(None)
assert head.tracking_branch() is None
@@ -104,7 +105,7 @@ class TestRefs(TestBase):
head.set_tracking_branch(None)
assert head.tracking_branch() is None
# END for each head
-
+
# verify REFLOG gets altered
head = HEAD(rw_repo)
cur_head = head.ref
@@ -118,75 +119,74 @@ class TestRefs(TestBase):
assert len(thlog) == hlog_len + 1
assert thlog[-1].oldhexsha == cur_commit.hexsha
assert thlog[-1].newhexsha == pcommit.hexsha
-
+
# the ref didn't change though
assert len(cur_head.log()) == blog_len
-
+
# head changes once again, cur_head doesn't change
head.set_reference(cur_head, 'reattach head')
- assert len(head.log()) == hlog_len+2
+ assert len(head.log()) == hlog_len + 2
assert len(cur_head.log()) == blog_len
-
+
# adjusting the head-ref also adjust the head, so both reflogs are
# altered
cur_head.set_commit(pcommit, 'changing commit')
- assert len(cur_head.log()) == blog_len+1
- assert len(head.log()) == hlog_len+3
-
-
+ assert len(cur_head.log()) == blog_len + 1
+ assert len(head.log()) == hlog_len + 3
+
# with automatic dereferencing
assert head.set_commit(cur_commit, 'change commit once again') is head
- assert len(head.log()) == hlog_len+4
- assert len(cur_head.log()) == blog_len+2
-
+ assert len(head.log()) == hlog_len + 4
+ assert len(cur_head.log()) == blog_len + 2
+
# a new branch has just a single entry
other_head = Head.create(rw_repo, 'mynewhead', pcommit, logmsg='new head created')
log = other_head.log()
assert len(log) == 1
assert log[0].oldhexsha == pcommit.NULL_HEX_SHA
assert log[0].newhexsha == pcommit.hexsha
-
+
def test_refs(self):
types_found = set()
for ref in Reference.list_items(self.rorepo):
types_found.add(type(ref))
- assert len(types_found) >= 3
-
+ assert len(types_found) >= 3
+
def test_is_valid(self):
assert Reference(self.rorepo, 'refs/doesnt/exist').is_valid() == False
assert HEAD(self.rorepo).is_valid()
assert HEAD(self.rorepo).reference.is_valid()
assert SymbolicReference(self.rorepo, 'hellothere').is_valid() == False
-
+
def test_orig_head(self):
assert type(HEAD(self.rorepo).orig_head()) == SymbolicReference
-
+
@with_rw_repo("0.1.6")
def test_head_reset(self, rw_repo):
cur_head = HEAD(rw_repo)
old_head_commit = cur_head.commit
new_head_commit = cur_head.ref.commit.parents[0]
-
- cur_head.reset(new_head_commit, index=True) # index only
+
+ cur_head.reset(new_head_commit, index=True) # index only
assert cur_head.reference.commit == new_head_commit
-
+
self.failUnlessRaises(ValueError, cur_head.reset, new_head_commit, index=False, working_tree=True)
new_head_commit = new_head_commit.parents[0]
cur_head.reset(new_head_commit, index=True, working_tree=True) # index + wt
assert cur_head.reference.commit == new_head_commit
-
+
# paths - make sure we have something to do
rw_repo.index.reset(old_head_commit.parents[0])
- cur_head.reset(cur_head, paths = "test")
- cur_head.reset(new_head_commit, paths = "lib")
+ cur_head.reset(cur_head, paths="test")
+ cur_head.reset(new_head_commit, paths="lib")
# hard resets with paths don't work, its all or nothing
- self.failUnlessRaises(GitCommandError, cur_head.reset, new_head_commit, working_tree=True, paths = "lib")
-
+ self.failUnlessRaises(GitCommandError, cur_head.reset, new_head_commit, working_tree=True, paths="lib")
+
# we can do a mixed reset, and then checkout from the index though
cur_head.reset(new_head_commit)
- rw_repo.index.checkout(["lib"], force=True)#
-
- # now that we have a write write repo, change the HEAD reference - its
+ rw_repo.index.checkout(["lib"], force=True)
+
+ # now that we have a write write repo, change the HEAD reference - its
# like git-reset --soft
heads = Head.list_items(rw_repo)
assert heads
@@ -197,7 +197,7 @@ class TestRefs(TestBase):
assert cur_head.commit == head.commit
assert not cur_head.is_detached
# END for each head
-
+
# detach
active_head = heads[0]
curhead_commit = active_head.commit
@@ -205,50 +205,50 @@ class TestRefs(TestBase):
assert cur_head.commit == curhead_commit
assert cur_head.is_detached
self.failUnlessRaises(TypeError, getattr, cur_head, "reference")
-
+
# tags are references, hence we can point to them
some_tag = TagReference.list_items(rw_repo)[0]
cur_head.reference = some_tag
assert not cur_head.is_detached
assert cur_head.commit == some_tag.commit
- assert isinstance(cur_head.reference, TagReference)
-
+ assert isinstance(cur_head.reference, TagReference)
+
# put HEAD back to a real head, otherwise everything else fails
cur_head.reference = active_head
-
+
# type check
self.failUnlessRaises(ValueError, setattr, cur_head, "reference", "that")
-
- # head handling
+
+ # head handling
commit = 'HEAD'
prev_head_commit = cur_head.commit
for count, new_name in enumerate(("my_new_head", "feature/feature1")):
- actual_commit = commit+"^"*count
+ actual_commit = commit + "^" * count
new_head = Head.create(rw_repo, new_name, actual_commit)
assert new_head.is_detached
assert cur_head.commit == prev_head_commit
assert isinstance(new_head, Head)
# already exists, but has the same value, so its fine
Head.create(rw_repo, new_name, new_head.commit)
-
+
# its not fine with a different value
self.failUnlessRaises(OSError, Head.create, rw_repo, new_name, new_head.commit.parents[0])
-
+
# force it
new_head = Head.create(rw_repo, new_name, actual_commit, force=True)
old_path = new_head.path
old_name = new_head.name
-
+
assert new_head.rename("hello").name == "hello"
- assert new_head.rename("hello/world").name == "hello/world" # yes, this must work
+ assert new_head.rename("hello/world").name == "hello/world" # yes, this must work
assert new_head.rename(old_name).name == old_name and new_head.path == old_path
-
+
# rename with force
tmp_head = Head.create(rw_repo, "tmphead")
self.failUnlessRaises(GitCommandError, tmp_head.rename, new_head)
tmp_head.rename(new_head, force=True)
assert tmp_head == new_head and tmp_head.object == new_head.object
-
+
logfile = RefLog.path(tmp_head)
assert os.path.isfile(logfile)
Head.delete(rw_repo, tmp_head)
@@ -259,17 +259,17 @@ class TestRefs(TestBase):
# force on deletion testing would be missing here, code looks okay though ;)
# END for each new head name
self.failUnlessRaises(TypeError, RemoteReference.create, rw_repo, "some_name")
-
+
# tag ref
tag_name = "1.0.2"
light_tag = TagReference.create(rw_repo, tag_name)
self.failUnlessRaises(GitCommandError, TagReference.create, rw_repo, tag_name)
- light_tag = TagReference.create(rw_repo, tag_name, "HEAD~1", force = True)
+ light_tag = TagReference.create(rw_repo, tag_name, "HEAD~1", force=True)
assert isinstance(light_tag, TagReference)
assert light_tag.name == tag_name
assert light_tag.commit == cur_head.commit.parents[0]
assert light_tag.tag is None
-
+
# tag with tag object
other_tag_name = "releases/1.0.2RC"
msg = "my mighty tag\nsecond line"
@@ -278,49 +278,49 @@ class TestRefs(TestBase):
assert obj_tag.name == other_tag_name
assert obj_tag.commit == cur_head.commit
assert obj_tag.tag is not None
-
+
TagReference.delete(rw_repo, light_tag, obj_tag)
tags = rw_repo.tags
assert light_tag not in tags and obj_tag not in tags
-
+
# remote deletion
remote_refs_so_far = 0
- remotes = rw_repo.remotes
+ remotes = rw_repo.remotes
assert remotes
for remote in remotes:
refs = remote.refs
-
+
# If a HEAD exists, it must be deleted first. Otherwise it might
# end up pointing to an invalid ref it the ref was deleted before.
remote_head_name = "HEAD"
if remote_head_name in refs:
RemoteReference.delete(rw_repo, refs[remote_head_name])
del(refs[remote_head_name])
- #END handle HEAD deletion
-
+ # END handle HEAD deletion
+
RemoteReference.delete(rw_repo, *refs)
remote_refs_so_far += len(refs)
for ref in refs:
assert ref.remote_name == remote.name
# END for each ref to delete
assert remote_refs_so_far
-
+
for remote in remotes:
# remotes without references throw
self.failUnlessRaises(AssertionError, getattr, remote, 'refs')
# END for each remote
-
+
# change where the active head points to
if cur_head.is_detached:
cur_head.reference = rw_repo.heads[0]
-
+
head = cur_head.reference
old_commit = head.commit
head.commit = old_commit.parents[0]
assert head.commit == old_commit.parents[0]
assert head.commit == cur_head.commit
head.commit = old_commit
-
+
# setting a non-commit as commit fails, but succeeds as object
head_tree = head.commit.tree
self.failUnlessRaises(ValueError, setattr, head, 'commit', head_tree)
@@ -329,8 +329,8 @@ class TestRefs(TestBase):
head.object = head_tree
assert head.object == head_tree
# cannot query tree as commit
- self.failUnlessRaises(TypeError, getattr, head, 'commit')
-
+ self.failUnlessRaises(TypeError, getattr, head, 'commit')
+
# set the commit directly using the head. This would never detach the head
assert not cur_head.is_detached
head.object = old_commit
@@ -340,58 +340,58 @@ class TestRefs(TestBase):
assert cur_head.is_detached
cur_head.commit = parent_commit
assert cur_head.is_detached and cur_head.commit == parent_commit
-
+
cur_head.reference = head
assert not cur_head.is_detached
cur_head.commit = parent_commit
assert not cur_head.is_detached
assert head.commit == parent_commit
-
+
# test checkout
active_branch = rw_repo.active_branch
for head in rw_repo.heads:
checked_out_head = head.checkout()
assert checked_out_head == head
# END for each head to checkout
-
+
# checkout with branch creation
new_head = active_branch.checkout(b="new_head")
assert active_branch != rw_repo.active_branch
assert new_head == rw_repo.active_branch
-
+
# checkout with force as we have a changed a file
# clear file
- open(new_head.commit.tree.blobs[-1].abspath,'w').close()
+ open(new_head.commit.tree.blobs[-1].abspath, 'w').close()
assert len(new_head.commit.diff(None))
-
+
# create a new branch that is likely to touch the file we changed
- far_away_head = rw_repo.create_head("far_head",'HEAD~100')
+ far_away_head = rw_repo.create_head("far_head", 'HEAD~100')
self.failUnlessRaises(GitCommandError, far_away_head.checkout)
assert active_branch == active_branch.checkout(force=True)
assert rw_repo.head.reference != far_away_head
-
+
# test reference creation
partial_ref = 'sub/ref'
full_ref = 'refs/%s' % partial_ref
ref = Reference.create(rw_repo, partial_ref)
assert ref.path == full_ref
assert ref.object == rw_repo.head.commit
-
+
self.failUnlessRaises(OSError, Reference.create, rw_repo, full_ref, 'HEAD~20')
# it works if it is at the same spot though and points to the same reference
assert Reference.create(rw_repo, full_ref, 'HEAD').path == full_ref
Reference.delete(rw_repo, full_ref)
-
+
# recreate the reference using a full_ref
ref = Reference.create(rw_repo, full_ref)
assert ref.path == full_ref
assert ref.object == rw_repo.head.commit
-
+
# recreate using force
ref = Reference.create(rw_repo, partial_ref, 'HEAD~1', force=True)
assert ref.path == full_ref
assert ref.object == rw_repo.head.commit.parents[0]
-
+
# rename it
orig_obj = ref.object
for name in ('refs/absname', 'rela_name', 'feature/rela_name'):
@@ -401,10 +401,10 @@ class TestRefs(TestBase):
assert ref_new_name.object == orig_obj
assert ref_new_name == ref
# END for each name type
-
+
# References that don't exist trigger an error if we want to access them
self.failUnlessRaises(ValueError, getattr, Reference(rw_repo, "refs/doesntexist"), 'commit')
-
+
# exists, fail unless we force
ex_ref_path = far_away_head.path
self.failUnlessRaises(OSError, ref.rename, ex_ref_path)
@@ -412,34 +412,34 @@ class TestRefs(TestBase):
far_away_head.commit = ref.commit
ref.rename(ex_ref_path)
assert ref.path == ex_ref_path and ref.object == orig_obj
- assert ref.rename(ref.path).path == ex_ref_path # rename to same name
-
+ assert ref.rename(ref.path).path == ex_ref_path # rename to same name
+
# create symbolic refs
symref_path = "symrefs/sym"
symref = SymbolicReference.create(rw_repo, symref_path, cur_head.reference)
assert symref.path == symref_path
assert symref.reference == cur_head.reference
-
+
self.failUnlessRaises(OSError, SymbolicReference.create, rw_repo, symref_path, cur_head.reference.commit)
- # it works if the new ref points to the same reference
+ # it works if the new ref points to the same reference
SymbolicReference.create(rw_repo, symref.path, symref.reference).path == symref.path
SymbolicReference.delete(rw_repo, symref)
# would raise if the symref wouldn't have been deletedpbl
symref = SymbolicReference.create(rw_repo, symref_path, cur_head.reference)
-
+
# test symbolic references which are not at default locations like HEAD
# or FETCH_HEAD - they may also be at spots in refs of course
symbol_ref_path = "refs/symbol_ref"
symref = SymbolicReference(rw_repo, symbol_ref_path)
assert symref.path == symbol_ref_path
-
+
# set it
symref.reference = new_head
assert symref.reference == new_head
assert os.path.isfile(symref.abspath)
assert symref.commit == new_head.commit
-
- for name in ('absname','folder/rela_name'):
+
+ for name in ('absname', 'folder/rela_name'):
symref_new_name = symref.rename(name)
assert isinstance(symref_new_name, SymbolicReference)
assert name in symref_new_name.path
@@ -447,10 +447,10 @@ class TestRefs(TestBase):
assert symref_new_name == symref
assert not symref.is_detached
# END for each ref
-
+
# create a new non-head ref just to be sure we handle it even if packed
Reference.create(rw_repo, full_ref)
-
+
# test ref listing - assure we have packed refs
rw_repo.git.pack_refs(all=True, prune=True)
heads = rw_repo.heads
@@ -458,14 +458,14 @@ class TestRefs(TestBase):
assert new_head in heads
assert active_branch in heads
assert rw_repo.tags
-
+
# we should be able to iterate all symbolic refs as well - in that case
# we should expect only symbolic references to be returned
for symref in SymbolicReference.iter_items(rw_repo):
assert not symref.is_detached
-
+
# when iterating references, we can get references and symrefs
- # when deleting all refs, I'd expect them to be gone ! Even from
+ # when deleting all refs, I'd expect them to be gone ! Even from
# the packed ones
# For this to work, we must not be on any branch
rw_repo.head.reference = rw_repo.head.commit
@@ -477,64 +477,64 @@ class TestRefs(TestBase):
# END delete ref
# END for each ref to iterate and to delete
assert deleted_refs
-
+
for ref in Reference.iter_items(rw_repo):
if ref.is_detached:
assert ref not in deleted_refs
# END for each ref
-
- # reattach head - head will not be returned if it is not a symbolic
+
+ # reattach head - head will not be returned if it is not a symbolic
# ref
rw_repo.head.reference = Head.create(rw_repo, "master")
-
+
# At least the head should still exist
assert os.path.isfile(rw_repo.head.abspath)
refs = list(SymbolicReference.iter_items(rw_repo))
assert len(refs) == 1
-
-
+
# test creation of new refs from scratch
for path in ("basename", "dir/somename", "dir2/subdir/basename"):
- # REFERENCES
+ # REFERENCES
############
fpath = Reference.to_full_path(path)
ref_fp = Reference.from_path(rw_repo, fpath)
assert not ref_fp.is_valid()
ref = Reference(rw_repo, fpath)
assert ref == ref_fp
-
+
# can be created by assigning a commit
ref.commit = rw_repo.head.commit
assert ref.is_valid()
-
+
# if the assignment raises, the ref doesn't exist
Reference.delete(ref.repo, ref.path)
assert not ref.is_valid()
self.failUnlessRaises(ValueError, setattr, ref, 'commit', "nonsense")
assert not ref.is_valid()
-
+
# I am sure I had my reason to make it a class method at first, but
# now it doesn't make so much sense anymore, want an instance method as well
# See http://byronimo.lighthouseapp.com/projects/51787-gitpython/tickets/27
Reference.delete(ref.repo, ref.path)
assert not ref.is_valid()
-
+
ref.object = rw_repo.head.commit
assert ref.is_valid()
-
+
Reference.delete(ref.repo, ref.path)
assert not ref.is_valid()
self.failUnlessRaises(ValueError, setattr, ref, 'object', "nonsense")
assert not ref.is_valid()
-
+
# END for each path
-
+
def test_dereference_recursive(self):
# for now, just test the HEAD
assert SymbolicReference.dereference_recursive(self.rorepo, 'HEAD')
-
+
def test_reflog(self):
assert isinstance(Head.list_items(self.rorepo).master.log(), RefLog)
-
+
def test_pure_python_rename(self):
- raise SkipTest("Pure python reference renames cannot properly handle refnames which become a directory after rename")
+ raise SkipTest(
+ "Pure python reference renames cannot properly handle refnames which become a directory after rename")
diff --git a/git/test/test_base.py b/git/test/test_base.py
index 67f370d2..10d98b17 100644
--- a/git/test/test_base.py
+++ b/git/test/test_base.py
@@ -4,20 +4,20 @@
# This module is part of GitPython and is released under
# the BSD License: http://www.opensource.org/licenses/bsd-license.php
from lib import (
- TestBase,
- with_rw_repo,
- DummyStream,
- DeriveTest,
- with_rw_and_rw_remote_repo
- )
+ TestBase,
+ with_rw_repo,
+ DummyStream,
+ DeriveTest,
+ with_rw_and_rw_remote_repo
+)
import git.objects.base as base
from git.objects import (
- Blob,
- Tree,
- Commit,
- TagObject
- )
+ Blob,
+ Tree,
+ Commit,
+ TagObject
+)
import git.refs as refs
@@ -30,33 +30,34 @@ import tempfile
from git.util import (
NULL_BIN_SHA
- )
+)
from git.typ import str_blob_type
from git.base import (
- OInfo,
- OPackInfo,
- ODeltaPackInfo,
- OStream,
- OPackStream,
- ODeltaPackStream,
- IStream,
- )
+ OInfo,
+ OPackInfo,
+ ODeltaPackInfo,
+ OStream,
+ OPackStream,
+ ODeltaPackStream,
+ IStream,
+)
import os
+
class TestBase(TestBase):
-
- type_tuples = ( ("blob", "8741fc1d09d61f02ffd8cded15ff603eff1ec070", "blob.py"),
- ("tree", "3a6a5e3eeed3723c09f1ef0399f81ed6b8d82e79", "directory"),
- ("commit", "4251bd59fb8e11e40c40548cba38180a9536118c", None),
- ("tag", "e56a60e8e9cd333cfba0140a77cd12b0d9398f10", None) )
-
- def test_base_object(self):
+
+ type_tuples = (("blob", "8741fc1d09d61f02ffd8cded15ff603eff1ec070", "blob.py"),
+ ("tree", "3a6a5e3eeed3723c09f1ef0399f81ed6b8d82e79", "directory"),
+ ("commit", "4251bd59fb8e11e40c40548cba38180a9536118c", None),
+ ("tag", "e56a60e8e9cd333cfba0140a77cd12b0d9398f10", None))
+
+ def test_base_object(self):
# test interface of base object classes
types = (Blob, Tree, Commit, TagObject)
assert len(types) == len(self.type_tuples)
-
+
s = set()
num_objs = 0
num_index_objs = 0
@@ -64,9 +65,9 @@ class TestBase(TestBase):
binsha = hex_to_bin(hexsha)
item = None
if path is None:
- item = obj_type(self.rorepo,binsha)
+ item = obj_type(self.rorepo, binsha)
else:
- item = obj_type(self.rorepo,binsha, 0, path)
+ item = obj_type(self.rorepo, binsha, 0, path)
# END handle index objects
num_objs += 1
assert item.hexsha == hexsha
@@ -77,88 +78,86 @@ class TestBase(TestBase):
assert str(item) == item.hexsha
assert repr(item)
s.add(item)
-
+
if isinstance(item, base.IndexObject):
num_index_objs += 1
- if hasattr(item,'path'): # never runs here
+ if hasattr(item, 'path'): # never runs here
assert not item.path.startswith("/") # must be relative
assert isinstance(item.mode, int)
# END index object check
-
+
# read from stream
data_stream = item.data_stream
data = data_stream.read()
assert data
-
+
tmpfile = os.tmpfile()
assert item == item.stream_data(tmpfile)
tmpfile.seek(0)
assert tmpfile.read() == data
# END stream to file directly
# END for each object type to create
-
+
# each has a unique sha
assert len(s) == num_objs
- assert len(s|s) == num_objs
+ assert len(s | s) == num_objs
assert num_index_objs == 2
-
+
def test_get_object_type_by_name(self):
for tname in base.Object.TYPES:
assert base.Object in get_object_type_by_name(tname).mro()
- # END for each known type
-
+ # END for each known type
+
self.failUnlessRaises(ValueError, get_object_type_by_name, "doesntexist")
def test_object_resolution(self):
# objects must be resolved to shas so they compare equal
assert self.rorepo.head.reference.object == self.rorepo.active_branch.object
-
+
@with_rw_repo('HEAD', bare=True)
def test_with_bare_rw_repo(self, bare_rw_repo):
assert bare_rw_repo.config_reader("repository").getboolean("core", "bare")
- assert os.path.isfile(os.path.join(bare_rw_repo.git_dir,'HEAD'))
-
+ assert os.path.isfile(os.path.join(bare_rw_repo.git_dir, 'HEAD'))
+
@with_rw_repo('0.1.6')
def test_with_rw_repo(self, rw_repo):
assert not rw_repo.config_reader("repository").getboolean("core", "bare")
- assert os.path.isdir(os.path.join(rw_repo.working_tree_dir,'lib'))
-
+ assert os.path.isdir(os.path.join(rw_repo.working_tree_dir, 'lib'))
+
@with_rw_and_rw_remote_repo('0.1.6')
def test_with_rw_remote_and_rw_repo(self, rw_repo, rw_remote_repo):
assert not rw_repo.config_reader("repository").getboolean("core", "bare")
assert rw_remote_repo.config_reader("repository").getboolean("core", "bare")
- assert os.path.isdir(os.path.join(rw_repo.working_tree_dir,'lib'))
-
-
+ assert os.path.isdir(os.path.join(rw_repo.working_tree_dir, 'lib'))
+
class TestBaseTypes(TestBase):
-
+
def test_streams(self):
# test info
sha = NULL_BIN_SHA
s = 20
blob_id = 3
-
+
info = OInfo(sha, str_blob_type, s)
assert info.binsha == sha
assert info.type == str_blob_type
assert info.type_id == blob_id
assert info.size == s
-
+
# test pack info
# provides type_id
pinfo = OPackInfo(0, blob_id, s)
assert pinfo.type == str_blob_type
assert pinfo.type_id == blob_id
assert pinfo.pack_offset == 0
-
+
dpinfo = ODeltaPackInfo(0, blob_id, s, sha)
assert dpinfo.type == str_blob_type
assert dpinfo.type_id == blob_id
assert dpinfo.delta_info == sha
assert dpinfo.pack_offset == 0
-
-
+
# test ostream
stream = DummyStream()
ostream = OStream(*(info + (stream, )))
@@ -168,33 +167,33 @@ class TestBaseTypes(TestBase):
assert stream.bytes == 15
ostream.read(20)
assert stream.bytes == 20
-
+
# test packstream
postream = OPackStream(*(pinfo + (stream, )))
assert postream.stream is stream
postream.read(10)
stream._assert()
assert stream.bytes == 10
-
+
# test deltapackstream
dpostream = ODeltaPackStream(*(dpinfo + (stream, )))
dpostream.stream is stream
dpostream.read(5)
stream._assert()
assert stream.bytes == 5
-
+
# derive with own args
- DeriveTest(sha, str_blob_type, s, stream, 'mine',myarg = 3)._assert()
-
+ DeriveTest(sha, str_blob_type, s, stream, 'mine', myarg=3)._assert()
+
# test istream
istream = IStream(str_blob_type, s, stream)
assert istream.binsha == None
istream.binsha = sha
assert istream.binsha == sha
-
+
assert len(istream.binsha) == 20
assert len(istream.hexsha) == 40
-
+
assert istream.size == s
istream.size = s * 2
istream.size == s * 2
@@ -204,9 +203,7 @@ class TestBaseTypes(TestBase):
assert istream.stream is stream
istream.stream = None
assert istream.stream is None
-
+
assert istream.error is None
istream.error = Exception()
assert isinstance(istream.error, Exception)
-
-
diff --git a/git/test/test_cmd.py b/git/test/test_cmd.py
index 5f59c200..adc5173a 100644
--- a/git/test/test_cmd.py
+++ b/git/test/test_cmd.py
@@ -4,18 +4,20 @@
# This module is part of GitPython and is released under
# the BSD License: http://www.opensource.org/licenses/bsd-license.php
-import os, sys
+import os
+import sys
from git.test.lib import (
- TestBase,
- patch,
- raises,
- assert_equal,
- assert_true,
- assert_match,
- fixture_path
- )
+ TestBase,
+ patch,
+ raises,
+ assert_equal,
+ assert_true,
+ assert_match,
+ fixture_path
+)
from git import Git, GitCommandError
+
class TestGit(TestBase):
@classmethod
@@ -42,7 +44,6 @@ class TestGit(TestBase):
def test_it_raises_errors(self):
self.git.this_does_not_exist()
-
def test_it_transforms_kwargs_into_git_command_arguments(self):
assert_equal(["-s"], self.git.transform_kwargs(**{'s': True}))
assert_equal(["-s5"], self.git.transform_kwargs(**{'s': 5}))
@@ -53,7 +54,7 @@ class TestGit(TestBase):
assert_equal(["-s", "-t"], self.git.transform_kwargs(**{'s': True, 't': True}))
def test_it_executes_git_to_shell_and_returns_result(self):
- assert_match('^git version [\d\.]{2}.*$', self.git.execute(["git","version"]))
+ assert_match('^git version [\d\.]{2}.*$', self.git.execute(["git", "version"]))
def test_it_accepts_stdin(self):
filename = fixture_path("cat_file_blob")
@@ -72,13 +73,13 @@ class TestGit(TestBase):
# read header only
import subprocess as sp
hexsha = "b2339455342180c7cc1e9bba3e9f181f7baa5167"
- g = self.git.cat_file(batch_check=True, istream=sp.PIPE,as_process=True)
+ g = self.git.cat_file(batch_check=True, istream=sp.PIPE, as_process=True)
g.stdin.write("b2339455342180c7cc1e9bba3e9f181f7baa5167\n")
g.stdin.flush()
obj_info = g.stdout.readline()
# read header + data
- g = self.git.cat_file(batch=True, istream=sp.PIPE,as_process=True)
+ g = self.git.cat_file(batch=True, istream=sp.PIPE, as_process=True)
g.stdin.write("b2339455342180c7cc1e9bba3e9f181f7baa5167\n")
g.stdin.flush()
obj_info_two = g.stdout.readline()
@@ -94,9 +95,8 @@ class TestGit(TestBase):
g.stdin.flush()
assert g.stdout.readline() == obj_info
-
# same can be achived using the respective command functions
- hexsha, typename, size = self.git.get_object_header(hexsha)
+ hexsha, typename, size = self.git.get_object_header(hexsha)
hexsha, typename_two, size_two, data = self.git.get_object_data(hexsha)
assert typename == typename_two and size == size_two
@@ -105,17 +105,18 @@ class TestGit(TestBase):
assert isinstance(v, tuple)
for n in v:
assert isinstance(n, int)
- #END verify number types
+ # END verify number types
def test_cmd_override(self):
prev_cmd = self.git.GIT_PYTHON_GIT_EXECUTABLE
try:
# set it to something that doens't exist, assure it raises
- type(self.git).GIT_PYTHON_GIT_EXECUTABLE = os.path.join("some", "path", "which", "doesn't", "exist", "gitbinary")
+ type(self.git).GIT_PYTHON_GIT_EXECUTABLE = os.path.join(
+ "some", "path", "which", "doesn't", "exist", "gitbinary")
self.failUnlessRaises(OSError, self.git.version)
finally:
type(self.git).GIT_PYTHON_GIT_EXECUTABLE = prev_cmd
- #END undo adjustment
+ # END undo adjustment
def test_output_strip(self):
import subprocess as sp
diff --git a/git/test/test_config.py b/git/test/test_config.py
index b37db290..b00240b0 100644
--- a/git/test/test_config.py
+++ b/git/test/test_config.py
@@ -10,36 +10,37 @@ from git.config import *
from copy import copy
from ConfigParser import NoSectionError
+
class TestConfig(TestBase):
-
+
def _to_memcache(self, file_path):
fp = open(file_path, "r")
sio = StringIO.StringIO(fp.read())
sio.name = file_path
return sio
-
+
def _parsers_equal_or_raise(self, lhs, rhs):
pass
-
+
def test_read_write(self):
# writer must create the exact same file as the one read before
for filename in ("git_config", "git_config_global"):
file_obj = self._to_memcache(fixture_path(filename))
file_obj_orig = copy(file_obj)
- w_config = GitConfigParser(file_obj, read_only = False)
+ w_config = GitConfigParser(file_obj, read_only=False)
w_config.read() # enforce reading
assert w_config._sections
w_config.write() # enforce writing
-
+
# we stripped lines when reading, so the results differ
assert file_obj.getvalue() != file_obj_orig.getvalue()
-
+
# creating an additional config writer must fail due to exclusive access
- self.failUnlessRaises(IOError, GitConfigParser, file_obj, read_only = False)
-
+ self.failUnlessRaises(IOError, GitConfigParser, file_obj, read_only=False)
+
# should still have a lock and be able to make changes
assert w_config._lock._has_lock()
-
+
# changes should be written right away
sname = "my_section"
oname = "mykey"
@@ -47,23 +48,23 @@ class TestConfig(TestBase):
w_config.add_section(sname)
assert w_config.has_section(sname)
w_config.set(sname, oname, val)
- assert w_config.has_option(sname,oname)
+ assert w_config.has_option(sname, oname)
assert w_config.get(sname, oname) == val
-
+
sname_new = "new_section"
oname_new = "new_key"
ival = 10
w_config.set_value(sname_new, oname_new, ival)
assert w_config.get_value(sname_new, oname_new) == ival
-
+
file_obj.seek(0)
r_config = GitConfigParser(file_obj, read_only=True)
- #print file_obj.getvalue()
+ # print file_obj.getvalue()
assert r_config.has_section(sname)
assert r_config.has_option(sname, oname)
assert r_config.get(sname, oname) == val
# END for each filename
-
+
def test_base(self):
path_repo = fixture_path("git_config")
path_global = fixture_path("git_config_global")
@@ -71,7 +72,7 @@ class TestConfig(TestBase):
assert r_config.read_only
num_sections = 0
num_options = 0
-
+
# test reader methods
assert r_config._is_initialized == False
for section in r_config.sections():
@@ -84,27 +85,27 @@ class TestConfig(TestBase):
assert val
assert "\n" not in option
assert "\n" not in val
-
+
# writing must fail
self.failUnlessRaises(IOError, r_config.set, section, option, None)
- self.failUnlessRaises(IOError, r_config.remove_option, section, option )
+ self.failUnlessRaises(IOError, r_config.remove_option, section, option)
# END for each option
self.failUnlessRaises(IOError, r_config.remove_section, section)
- # END for each section
+ # END for each section
assert num_sections and num_options
assert r_config._is_initialized == True
-
+
# get value which doesnt exist, with default
default = "my default value"
assert r_config.get_value("doesnt", "exist", default) == default
-
+
# it raises if there is no default though
self.failUnlessRaises(NoSectionError, r_config.get_value, "doesnt", "exist")
-
+
def test_values(self):
file_obj = self._to_memcache(fixture_path("git_config_values"))
- w_config = GitConfigParser(file_obj, read_only = False)
- w_config.write() # enforce writing
+ w_config = GitConfigParser(file_obj, read_only=False)
+ w_config.write() # enforce writing
orig_value = file_obj.getvalue()
# Reading must unescape backslashes
@@ -122,7 +123,7 @@ class TestConfig(TestBase):
# Writing must escape backslashes and quotes
w_config.set('values', 'backslash', backslash)
w_config.set('values', 'quote', quote)
- w_config.write() # enforce writing
+ w_config.write() # enforce writing
# Contents shouldn't differ
assert file_obj.getvalue() == orig_value
diff --git a/git/test/test_diff.py b/git/test/test_diff.py
index 98e72d6c..e55cbecc 100644
--- a/git/test/test_diff.py
+++ b/git/test/test_diff.py
@@ -5,17 +5,18 @@
# the BSD License: http://www.opensource.org/licenses/bsd-license.php
from git.test.lib import (
- TestBase,
- StringProcessAdapter,
- fixture,
- assert_equal,
- assert_true
- )
+ TestBase,
+ StringProcessAdapter,
+ fixture,
+ assert_equal,
+ assert_true
+)
from git.diff import *
+
class TestDiff(TestBase):
-
+
def _assert_diff_format(self, diffs):
# verify that the format of the diff is sane
for diff in diffs:
@@ -23,19 +24,19 @@ class TestDiff(TestBase):
assert isinstance(diff.a_mode, int)
if diff.b_mode:
assert isinstance(diff.b_mode, int)
-
+
if diff.a_blob:
assert not diff.a_blob.path.endswith('\n')
if diff.b_blob:
assert not diff.b_blob.path.endswith('\n')
# END for each diff
return diffs
-
+
def test_list_from_string_new_mode(self):
output = StringProcessAdapter(fixture('diff_new_mode'))
diffs = Diff._index_from_patch_format(self.rorepo, output.stdout)
self._assert_diff_format(diffs)
-
+
assert_equal(1, len(diffs))
assert_equal(10, len(diffs[0].diff.splitlines()))
@@ -43,7 +44,7 @@ class TestDiff(TestBase):
output = StringProcessAdapter(fixture('diff_rename'))
diffs = Diff._index_from_patch_format(self.rorepo, output.stdout)
self._assert_diff_format(diffs)
-
+
assert_equal(1, len(diffs))
diff = diffs[0]
@@ -67,10 +68,10 @@ class TestDiff(TestBase):
def test_diff_patch_format(self):
# test all of the 'old' format diffs for completness - it should at least
# be able to deal with it
- fixtures = ("diff_2", "diff_2f", "diff_f", "diff_i", "diff_mode_only",
- "diff_new_mode", "diff_numstat", "diff_p", "diff_rename",
- "diff_tree_numstat_root" )
-
+ fixtures = ("diff_2", "diff_2f", "diff_f", "diff_i", "diff_mode_only",
+ "diff_new_mode", "diff_numstat", "diff_p", "diff_rename",
+ "diff_tree_numstat_root")
+
for fixture_name in fixtures:
diff_proc = StringProcessAdapter(fixture(fixture_name))
diffs = Diff._index_from_patch_format(self.rorepo, diff_proc.stdout)
@@ -81,24 +82,24 @@ class TestDiff(TestBase):
assertion_map = dict()
for i, commit in enumerate(self.rorepo.iter_commits('0.1.6', max_count=2)):
diff_item = commit
- if i%2 == 0:
+ if i % 2 == 0:
diff_item = commit.tree
# END use tree every second item
-
+
for other in (None, commit.Index, commit.parents[0]):
for paths in (None, "CHANGES", ("CHANGES", "lib")):
for create_patch in range(2):
diff_index = diff_item.diff(other, paths, create_patch)
assert isinstance(diff_index, DiffIndex)
-
+
if diff_index:
self._assert_diff_format(diff_index)
for ct in DiffIndex.change_type:
- key = 'ct_%s'%ct
+ key = 'ct_%s' % ct
assertion_map.setdefault(key, 0)
- assertion_map[key] = assertion_map[key]+len(list(diff_index.iter_change_type(ct)))
+ assertion_map[key] = assertion_map[key] + len(list(diff_index.iter_change_type(ct)))
# END for each changetype
-
+
# check entries
diff_set = set()
diff_set.add(diff_index[0])
@@ -106,7 +107,7 @@ class TestDiff(TestBase):
assert len(diff_set) == 1
assert diff_index[0] == diff_index[0]
assert not (diff_index[0] != diff_index[0])
- # END diff index checking
+ # END diff index checking
# END for each patch option
# END for each path option
# END for each other side
@@ -119,18 +120,16 @@ class TestDiff(TestBase):
assert len(rename_diffs) == 3
assert rename_diffs[0].rename_from == rename_diffs[0].a_blob.path
assert rename_diffs[0].rename_to == rename_diffs[0].b_blob.path
-
- # assert we could always find at least one instance of the members we
+
+ # assert we could always find at least one instance of the members we
# can iterate in the diff index - if not this indicates its not working correctly
# or our test does not span the whole range of possibilities
- for key,value in assertion_map.items():
+ for key, value in assertion_map.items():
assert value, "Did not find diff for %s" % key
- # END for each iteration type
-
+ # END for each iteration type
+
# test path not existing in the index - should be ignored
c = self.rorepo.head.commit
cp = c.parents[0]
diff_index = c.diff(cp, ["does/not/exist"])
assert len(diff_index) == 0
-
-
diff --git a/git/test/test_example.py b/git/test/test_example.py
index 1fd87b3f..8a80aac8 100644
--- a/git/test/test_example.py
+++ b/git/test/test_example.py
@@ -7,21 +7,22 @@ from lib import TestBase, fixture_path
from git.base import IStream
from git.db.py.loose import PureLooseObjectODB
from git.util import pool
-
+
from cStringIO import StringIO
from async import IteratorReader
-
+
+
class TestExamples(TestBase):
-
+
def test_base(self):
ldb = PureLooseObjectODB(fixture_path("../../../.git/objects"))
-
+
for sha1 in ldb.sha_iter():
oinfo = ldb.info(sha1)
ostream = ldb.stream(sha1)
assert oinfo[:3] == ostream[:3]
-
+
assert len(ostream.read()) == ostream.size
assert ldb.has_object(oinfo.binsha)
# END for each sha in database
@@ -32,33 +33,32 @@ class TestExamples(TestBase):
except UnboundLocalError:
pass
# END ignore exception if there are no loose objects
-
+
data = "my data"
istream = IStream("blob", len(data), StringIO(data))
-
+
# the object does not yet have a sha
assert istream.binsha is None
ldb.store(istream)
# now the sha is set
assert len(istream.binsha) == 20
assert ldb.has_object(istream.binsha)
-
-
+
# async operation
# Create a reader from an iterator
reader = IteratorReader(ldb.sha_iter())
-
+
# get reader for object streams
info_reader = ldb.stream_async(reader)
-
+
# read one
info = info_reader.read(1)[0]
-
+
# read all the rest until depletion
ostreams = info_reader.read()
-
+
# set the pool to use two threads
pool.set_size(2)
-
+
# synchronize the mode of operation
pool.set_size(0)
diff --git a/git/test/test_fun.py b/git/test/test_fun.py
index 15bc20ed..ecefd86f 100644
--- a/git/test/test_fun.py
+++ b/git/test/test_fun.py
@@ -1,29 +1,30 @@
from git.test.lib import TestBase, with_rw_repo
from git.objects.fun import (
- traverse_tree_recursive,
- traverse_trees_recursive,
- tree_to_stream
- )
+ traverse_tree_recursive,
+ traverse_trees_recursive,
+ tree_to_stream
+)
from git.index.fun import (
- aggressive_tree_merge
- )
+ aggressive_tree_merge
+)
from git.util import bin_to_hex
from git.base import IStream
from git.typ import str_tree_type
from stat import (
- S_IFDIR,
- S_IFREG,
- S_IFLNK
- )
+ S_IFDIR,
+ S_IFREG,
+ S_IFLNK
+)
from git.index import IndexFile
from cStringIO import StringIO
+
class TestFun(TestBase):
-
+
def _assert_index_entries(self, entries, trees):
index = IndexFile.from_tree(self.rorepo, *[self.rorepo.tree(bin_to_hex(t)) for t in trees])
assert entries
@@ -31,22 +32,22 @@ class TestFun(TestBase):
for entry in entries:
assert (entry.path, entry.stage) in index.entries
# END assert entry matches fully
-
+
def test_aggressive_tree_merge(self):
# head tree with additions, removals and modification compared to its predecessor
odb = self.rorepo.odb
- HC = self.rorepo.commit("6c1faef799095f3990e9970bc2cb10aa0221cf9c")
+ HC = self.rorepo.commit("6c1faef799095f3990e9970bc2cb10aa0221cf9c")
H = HC.tree
B = HC.parents[0].tree
-
+
# entries from single tree
trees = [H.binsha]
self._assert_index_entries(aggressive_tree_merge(odb, trees), trees)
-
+
# from multiple trees
trees = [B.binsha, H.binsha]
self._assert_index_entries(aggressive_tree_merge(odb, trees), trees)
-
+
# three way, no conflict
tree = self.rorepo.tree
B = tree("35a09c0534e89b2d43ec4101a5fb54576b577905")
@@ -54,16 +55,16 @@ class TestFun(TestBase):
M = tree("1f2b19de3301e76ab3a6187a49c9c93ff78bafbd")
trees = [B.binsha, H.binsha, M.binsha]
self._assert_index_entries(aggressive_tree_merge(odb, trees), trees)
-
+
# three-way, conflict in at least one file, both modified
B = tree("a7a4388eeaa4b6b94192dce67257a34c4a6cbd26")
H = tree("f9cec00938d9059882bb8eabdaf2f775943e00e5")
M = tree("44a601a068f4f543f73fd9c49e264c931b1e1652")
trees = [B.binsha, H.binsha, M.binsha]
self._assert_index_entries(aggressive_tree_merge(odb, trees), trees)
-
+
# too many trees
- self.failUnlessRaises(ValueError, aggressive_tree_merge, odb, trees*2)
+ self.failUnlessRaises(ValueError, aggressive_tree_merge, odb, trees * 2)
def mktree(self, odb, entries):
"""create a tree from the given tree entries and safe it to the database"""
@@ -72,122 +73,123 @@ class TestFun(TestBase):
sio.seek(0)
istream = odb.store(IStream(str_tree_type, len(sio.getvalue()), sio))
return istream.binsha
-
+
@with_rw_repo('0.1.6')
def test_three_way_merge(self, rwrepo):
def mkfile(name, sha, executable=0):
- return (sha, S_IFREG | 0644 | executable*0111, name)
+ return (sha, S_IFREG | 0644 | executable * 0111, name)
+
def mkcommit(name, sha):
return (sha, S_IFDIR | S_IFLNK, name)
+
def assert_entries(entries, num_entries, has_conflict=False):
assert len(entries) == num_entries
assert has_conflict == (len([e for e in entries if e.stage != 0]) > 0)
mktree = self.mktree
-
- shaa = "\1"*20
- shab = "\2"*20
- shac = "\3"*20
-
+
+ shaa = "\1" * 20
+ shab = "\2" * 20
+ shac = "\3" * 20
+
odb = rwrepo.odb
-
+
# base tree
bfn = 'basefile'
fbase = mkfile(bfn, shaa)
tb = mktree(odb, [fbase])
-
+
# non-conflicting new files, same data
fa = mkfile('1', shab)
th = mktree(odb, [fbase, fa])
fb = mkfile('2', shac)
tm = mktree(odb, [fbase, fb])
-
+
# two new files, same base file
trees = [tb, th, tm]
assert_entries(aggressive_tree_merge(odb, trees), 3)
-
+
# both delete same file, add own one
fa = mkfile('1', shab)
th = mktree(odb, [fa])
fb = mkfile('2', shac)
tm = mktree(odb, [fb])
-
+
# two new files
trees = [tb, th, tm]
assert_entries(aggressive_tree_merge(odb, trees), 2)
-
+
# same file added in both, differently
fa = mkfile('1', shab)
th = mktree(odb, [fa])
fb = mkfile('1', shac)
tm = mktree(odb, [fb])
-
+
# expect conflict
trees = [tb, th, tm]
assert_entries(aggressive_tree_merge(odb, trees), 2, True)
-
+
# same file added, different mode
fa = mkfile('1', shab)
th = mktree(odb, [fa])
fb = mkcommit('1', shab)
tm = mktree(odb, [fb])
-
+
# expect conflict
trees = [tb, th, tm]
assert_entries(aggressive_tree_merge(odb, trees), 2, True)
-
+
# same file added in both
fa = mkfile('1', shab)
th = mktree(odb, [fa])
fb = mkfile('1', shab)
tm = mktree(odb, [fb])
-
+
# expect conflict
trees = [tb, th, tm]
assert_entries(aggressive_tree_merge(odb, trees), 1)
-
+
# modify same base file, differently
fa = mkfile(bfn, shab)
th = mktree(odb, [fa])
fb = mkfile(bfn, shac)
tm = mktree(odb, [fb])
-
+
# conflict, 3 versions on 3 stages
trees = [tb, th, tm]
assert_entries(aggressive_tree_merge(odb, trees), 3, True)
-
-
+
# change mode on same base file, by making one a commit, the other executable
# no content change ( this is totally unlikely to happen in the real world )
fa = mkcommit(bfn, shaa)
th = mktree(odb, [fa])
fb = mkfile(bfn, shaa, executable=1)
tm = mktree(odb, [fb])
-
+
# conflict, 3 versions on 3 stages, because of different mode
trees = [tb, th, tm]
assert_entries(aggressive_tree_merge(odb, trees), 3, True)
-
+
for is_them in range(2):
# only we/they change contents
fa = mkfile(bfn, shab)
th = mktree(odb, [fa])
-
+
trees = [tb, th, tb]
if is_them:
trees = [tb, tb, th]
entries = aggressive_tree_merge(odb, trees)
assert len(entries) == 1 and entries[0].binsha == shab
-
+
# only we/they change the mode
fa = mkcommit(bfn, shaa)
th = mktree(odb, [fa])
-
+
trees = [tb, th, tb]
if is_them:
trees = [tb, tb, th]
entries = aggressive_tree_merge(odb, trees)
assert len(entries) == 1 and entries[0].binsha == shaa and entries[0].mode == fa[1]
-
+
# one side deletes, the other changes = conflict
fa = mkfile(bfn, shab)
th = mktree(odb, [fa])
@@ -198,16 +200,16 @@ class TestFun(TestBase):
# as one is deleted, there are only 2 entries
assert_entries(aggressive_tree_merge(odb, trees), 2, True)
# END handle ours, theirs
-
+
def _assert_tree_entries(self, entries, num_trees):
for entry in entries:
assert len(entry) == num_trees
paths = set(e[2] for e in entry if e)
-
+
# only one path per set of entries
assert len(paths) == 1
# END verify entry
-
+
def test_tree_traversal(self):
# low level tree tarversal
odb = self.rorepo.odb
@@ -215,29 +217,29 @@ class TestFun(TestBase):
M = self.rorepo.tree('e14e3f143e7260de9581aee27e5a9b2645db72de') # merge tree
B = self.rorepo.tree('f606937a7a21237c866efafcad33675e6539c103') # base tree
B_old = self.rorepo.tree('1f66cfbbce58b4b552b041707a12d437cc5f400a') # old base tree
-
+
# two very different trees
entries = traverse_trees_recursive(odb, [B_old.binsha, H.binsha], '')
self._assert_tree_entries(entries, 2)
-
+
oentries = traverse_trees_recursive(odb, [H.binsha, B_old.binsha], '')
assert len(oentries) == len(entries)
self._assert_tree_entries(oentries, 2)
-
+
# single tree
is_no_tree = lambda i, d: i.type != 'tree'
entries = traverse_trees_recursive(odb, [B.binsha], '')
assert len(entries) == len(list(B.traverse(predicate=is_no_tree)))
self._assert_tree_entries(entries, 1)
-
+
# two trees
entries = traverse_trees_recursive(odb, [B.binsha, H.binsha], '')
self._assert_tree_entries(entries, 2)
-
+
# tree trees
entries = traverse_trees_recursive(odb, [B.binsha, H.binsha, M.binsha], '')
self._assert_tree_entries(entries, 3)
-
+
def test_tree_traversal_single(self):
max_count = 50
count = 0
diff --git a/git/test/test_import.py b/git/test/test_import.py
index 606d4b03..49e04028 100644
--- a/git/test/test_import.py
+++ b/git/test/test_import.py
@@ -10,7 +10,8 @@ import os
from git import *
-def import_all(topdir, topmodule='git', skip = "test"):
+
+def import_all(topdir, topmodule='git', skip="test"):
base = os.path.basename
join = os.path.join
init_script = '__init__.py'
@@ -21,37 +22,37 @@ def import_all(topdir, topmodule='git', skip = "test"):
if init_script not in files:
del(dirs[:])
continue
- #END ignore non-packages
-
+ # END ignore non-packages
+
if skip in root:
continue
- #END handle ignores
-
+ # END handle ignores
+
for relafile in files:
if not relafile.endswith('.py'):
continue
if relafile == init_script:
continue
module_path = join(root, os.path.splitext(relafile)[0]).replace("/", ".").replace("\\", ".")
-
+
m = __import__(module_path, globals(), locals(), [""])
try:
attrlist = m.__all__
for attr in attrlist:
- assert hasattr(m, attr), "Invalid item in %s.__all__: %s" % (module_path, attr)
- #END veriy
+ assert hasattr(m, attr), "Invalid item in %s.__all__: %s" % (module_path, attr)
+ # END veriy
except AttributeError:
pass
# END try each listed attribute
- #END for each file in dir
- #END for each item
+ # END for each file in dir
+ # END for each item
finally:
os.chdir(prev_cwd)
- #END handle previous currentdir
-
-
+ # END handle previous currentdir
+
class TestDummy(object):
+
def test_base(self):
dn = os.path.dirname
# NOTE: i don't think this is working, as the __all__ variable is not used in this case
diff --git a/git/test/test_index.py b/git/test/test_index.py
index 029c961b..76d43cbf 100644
--- a/git/test/test_index.py
+++ b/git/test/test_index.py
@@ -5,11 +5,11 @@
# the BSD License: http://www.opensource.org/licenses/bsd-license.php
from git.test.lib import (
- TestBase,
- with_rw_repo,
- fixture_path,
- fixture
- )
+ TestBase,
+ with_rw_repo,
+ fixture_path,
+ fixture
+)
from git import *
import inspect
import os
@@ -20,12 +20,13 @@ import shutil
import time
from stat import *
+
class TestIndex(TestBase):
-
+
def __init__(self, *args):
super(TestIndex, self).__init__(*args)
self._reset_progress()
-
+
def _assert_fprogress(self, entries):
assert len(entries) == len(self._fprogress_map)
for path, call_count in self._fprogress_map.iteritems():
@@ -41,48 +42,48 @@ class TestIndex(TestBase):
if curval == 1:
assert done
self._fprogress_map[path] = curval + 1
-
+
def _fprogress_add(self, path, done, item):
"""Called as progress func - we keep track of the proper
call order"""
assert item is not None
self._fprogress(path, done, item)
-
+
def _reset_progress(self):
# maps paths to the count of calls
self._fprogress_map = dict()
-
+
def _assert_entries(self, entries):
for entry in entries:
assert isinstance(entry, BaseIndexEntry)
assert not os.path.isabs(entry.path)
assert not "\\" in entry.path
# END for each entry
-
+
def test_index_file_base(self):
# read from file
index = IndexFile(self.rorepo, fixture_path("index"))
assert index.entries
assert index.version > 0
-
+
# test entry
last_val = None
entry = index.entries.itervalues().next()
- for attr in ("path","ctime","mtime","dev","inode","mode","uid",
- "gid","size","binsha", "hexsha", "stage"):
+ for attr in ("path", "ctime", "mtime", "dev", "inode", "mode", "uid",
+ "gid", "size", "binsha", "hexsha", "stage"):
val = getattr(entry, attr)
# END for each method
-
+
# test update
entries = index.entries
assert isinstance(index.update(), IndexFile)
assert entries is not index.entries
-
+
# test stage
index_merge = IndexFile(self.rorepo, fixture_path("index_merge"))
assert len(index_merge.entries) == 106
- assert len(list(e for e in index_merge.entries.itervalues() if e.stage != 0 ))
-
+ assert len(list(e for e in index_merge.entries.itervalues() if e.stage != 0))
+
# write the data - it must match the original
tmpfile = tempfile.mktemp()
index_merge.write(tmpfile)
@@ -90,82 +91,81 @@ class TestIndex(TestBase):
assert fp.read() == fixture("index_merge")
fp.close()
os.remove(tmpfile)
-
+
def _cmp_tree_index(self, tree, index):
# fail unless both objects contain the same paths and blobs
if isinstance(tree, str):
tree = self.rorepo.commit(tree).tree
-
+
num_blobs = 0
blist = list()
- for blob in tree.traverse(predicate = lambda e,d: e.type == "blob", branch_first=False):
- assert (blob.path,0) in index.entries
+ for blob in tree.traverse(predicate=lambda e, d: e.type == "blob", branch_first=False):
+ assert (blob.path, 0) in index.entries
blist.append(blob)
# END for each blob in tree
if len(blist) != len(index.entries):
iset = set(k[0] for k in index.entries.keys())
bset = set(b.path for b in blist)
- raise AssertionError( "CMP Failed: Missing entries in index: %s, missing in tree: %s" % (bset-iset, iset-bset) )
+ raise AssertionError("CMP Failed: Missing entries in index: %s, missing in tree: %s" %
+ (bset - iset, iset - bset))
# END assertion message
-
+
@with_rw_repo('0.1.6')
def test_index_file_from_tree(self, rw_repo):
common_ancestor_sha = "5117c9c8a4d3af19a9958677e45cda9269de1541"
cur_sha = "4b43ca7ff72d5f535134241e7c797ddc9c7a3573"
other_sha = "39f85c4358b7346fee22169da9cad93901ea9eb9"
-
- # simple index from tree
+
+ # simple index from tree
base_index = IndexFile.from_tree(rw_repo, common_ancestor_sha)
assert base_index.entries
self._cmp_tree_index(common_ancestor_sha, base_index)
-
+
# merge two trees - its like a fast-forward
two_way_index = IndexFile.from_tree(rw_repo, common_ancestor_sha, cur_sha)
assert two_way_index.entries
self._cmp_tree_index(cur_sha, two_way_index)
-
+
# merge three trees - here we have a merge conflict
three_way_index = IndexFile.from_tree(rw_repo, common_ancestor_sha, cur_sha, other_sha)
assert len(list(e for e in three_way_index.entries.values() if e.stage != 0))
-
-
+
# ITERATE BLOBS
merge_required = lambda t: t[0] != 0
merge_blobs = list(three_way_index.iter_blobs(merge_required))
assert merge_blobs
- assert merge_blobs[0][0] in (1,2,3)
+ assert merge_blobs[0][0] in (1, 2, 3)
assert isinstance(merge_blobs[0][1], Blob)
-
+
# test BlobFilter
prefix = 'lib/git'
for stage, blob in base_index.iter_blobs(BlobFilter([prefix])):
- assert blob.path.startswith(prefix)
-
-
+ assert blob.path.startswith(prefix)
+
# writing a tree should fail with an unmerged index
self.failUnlessRaises(UnmergedEntriesError, three_way_index.write_tree)
-
+
# removed unmerged entries
unmerged_blob_map = three_way_index.unmerged_blobs()
assert unmerged_blob_map
-
+
# pick the first blob at the first stage we find and use it as resolved version
- three_way_index.resolve_blobs( l[0][1] for l in unmerged_blob_map.itervalues() )
+ three_way_index.resolve_blobs(l[0][1] for l in unmerged_blob_map.itervalues())
tree = three_way_index.write_tree()
assert isinstance(tree, Tree)
num_blobs = 0
- for blob in tree.traverse(predicate=lambda item,d: item.type == "blob"):
- assert (blob.path,0) in three_way_index.entries
+ for blob in tree.traverse(predicate=lambda item, d: item.type == "blob"):
+ assert (blob.path, 0) in three_way_index.entries
num_blobs += 1
# END for each blob
assert num_blobs == len(three_way_index.entries)
-
+
@with_rw_repo('0.1.6')
def test_index_merge_tree(self, rw_repo):
- # A bit out of place, but we need a different repo for this:
+ # A bit out of place, but we need a different repo for this:
assert self.rorepo != rw_repo and not (self.rorepo == rw_repo)
assert len(set((self.rorepo, self.rorepo, rw_repo, rw_repo))) == 2
-
+
# SINGLE TREE MERGE
# current index is at the (virtual) cur_commit
next_commit = "4c39f9da792792d4e73fc3a5effde66576ae128c"
@@ -175,107 +175,106 @@ class TestIndex(TestBase):
rw_repo.index.merge_tree(next_commit)
# only one change should be recorded
assert manifest_entry.binsha != rw_repo.index.entries[manifest_key].binsha
-
+
rw_repo.index.reset(rw_repo.head)
assert rw_repo.index.entries[manifest_key].binsha == manifest_entry.binsha
-
+
# FAKE MERGE
#############
- # Add a change with a NULL sha that should conflict with next_commit. We
- # pretend there was a change, but we do not even bother adding a proper
+ # Add a change with a NULL sha that should conflict with next_commit. We
+ # pretend there was a change, but we do not even bother adding a proper
# sha for it ( which makes things faster of course )
- manifest_fake_entry = BaseIndexEntry((manifest_entry[0], "\0"*20, 0, manifest_entry[3]))
+ manifest_fake_entry = BaseIndexEntry((manifest_entry[0], "\0" * 20, 0, manifest_entry[3]))
# try write flag
self._assert_entries(rw_repo.index.add([manifest_fake_entry], write=False))
- # add actually resolves the null-hex-sha for us as a feature, but we can
+ # add actually resolves the null-hex-sha for us as a feature, but we can
# edit the index manually
assert rw_repo.index.entries[manifest_key].binsha != Object.NULL_BIN_SHA
- # must operate on the same index for this ! Its a bit problematic as
+ # must operate on the same index for this ! Its a bit problematic as
# it might confuse people
- index = rw_repo.index
+ index = rw_repo.index
index.entries[manifest_key] = IndexEntry.from_base(manifest_fake_entry)
index.write()
assert rw_repo.index.entries[manifest_key].hexsha == Diff.NULL_HEX_SHA
-
+
# write an unchanged index ( just for the fun of it )
rw_repo.index.write()
-
- # a three way merge would result in a conflict and fails as the command will
- # not overwrite any entries in our index and hence leave them unmerged. This is
+
+ # a three way merge would result in a conflict and fails as the command will
+ # not overwrite any entries in our index and hence leave them unmerged. This is
# mainly a protection feature as the current index is not yet in a tree
self.failUnlessRaises(GitCommandError, index.merge_tree, next_commit, base=parent_commit)
-
- # the only way to get the merged entries is to safe the current index away into a tree,
+
+ # the only way to get the merged entries is to safe the current index away into a tree,
# which is like a temporary commit for us. This fails as well as the NULL sha deos not
# have a corresponding object
# NOTE: missing_ok is not a kwarg anymore, missing_ok is always true
# self.failUnlessRaises(GitCommandError, index.write_tree)
-
+
# if missing objects are okay, this would work though ( they are always okay now )
tree = index.write_tree()
-
+
# now make a proper three way merge with unmerged entries
unmerged_tree = IndexFile.from_tree(rw_repo, parent_commit, tree, next_commit)
unmerged_blobs = unmerged_tree.unmerged_blobs()
assert len(unmerged_blobs) == 1 and unmerged_blobs.keys()[0] == manifest_key[0]
-
-
+
@with_rw_repo('0.1.6')
def test_index_file_diffing(self, rw_repo):
# default Index instance points to our index
index = IndexFile(rw_repo)
assert index.path is not None
assert len(index.entries)
-
+
# write the file back
index.write()
-
+
# could sha it, or check stats
-
+
# test diff
- # resetting the head will leave the index in a different state, and the
+ # resetting the head will leave the index in a different state, and the
# diff will yield a few changes
cur_head_commit = rw_repo.head.reference.commit
ref = rw_repo.head.reset('HEAD~6', index=True, working_tree=False)
-
+
# diff against same index is 0
diff = index.diff()
assert len(diff) == 0
-
+
# against HEAD as string, must be the same as it matches index
diff = index.diff('HEAD')
assert len(diff) == 0
-
+
# against previous head, there must be a difference
diff = index.diff(cur_head_commit)
assert len(diff)
-
+
# we reverse the result
adiff = index.diff(str(cur_head_commit), R=True)
odiff = index.diff(cur_head_commit, R=False) # now its not reversed anymore
assert adiff != odiff
assert odiff == diff # both unreversed diffs against HEAD
-
+
# against working copy - its still at cur_commit
wdiff = index.diff(None)
assert wdiff != adiff
assert wdiff != odiff
-
+
# against something unusual
self.failUnlessRaises(ValueError, index.diff, int)
-
+
# adjust the index to match an old revision
cur_branch = rw_repo.active_branch
cur_commit = cur_branch.commit
rev_head_parent = 'HEAD~1'
assert index.reset(rev_head_parent) is index
-
+
assert cur_branch == rw_repo.active_branch
assert cur_commit == rw_repo.head.commit
-
+
# there must be differences towards the working tree which is in the 'future'
assert index.diff(None)
-
+
# reset the working copy as well to current head,to pull 'back' as well
new_data = "will be reverted"
file_path = os.path.join(rw_repo.working_tree_dir, "CHANGES")
@@ -286,12 +285,12 @@ class TestIndex(TestBase):
assert not index.diff(None)
assert cur_branch == rw_repo.active_branch
assert cur_commit == rw_repo.head.commit
- fp = open(file_path,'rb')
+ fp = open(file_path, 'rb')
try:
assert fp.read() != new_data
finally:
fp.close()
-
+
# test full checkout
test_file = os.path.join(rw_repo.working_tree_dir, "CHANGES")
open(test_file, 'ab').write("some data")
@@ -299,24 +298,24 @@ class TestIndex(TestBase):
assert 'CHANGES' in list(rval)
self._assert_fprogress([None])
assert os.path.isfile(test_file)
-
+
os.remove(test_file)
rval = index.checkout(None, force=False, fprogress=self._fprogress)
assert 'CHANGES' in list(rval)
self._assert_fprogress([None])
assert os.path.isfile(test_file)
-
+
# individual file
os.remove(test_file)
rval = index.checkout(test_file, fprogress=self._fprogress)
assert list(rval)[0] == 'CHANGES'
self._assert_fprogress([test_file])
assert os.path.exists(test_file)
-
+
# checking out non-existing file throws
self.failUnlessRaises(CheckoutError, index.checkout, "doesnt_exist_ever.txt.that")
self.failUnlessRaises(CheckoutError, index.checkout, paths=["doesnt/exist"])
-
+
# checkout file with modifications
append_data = "hello"
fp = open(test_file, "ab")
@@ -331,16 +330,16 @@ class TestIndex(TestBase):
assert open(test_file).read().endswith(append_data)
else:
raise AssertionError("Exception CheckoutError not thrown")
-
+
# if we force it it should work
index.checkout(test_file, force=True)
assert not open(test_file).read().endswith(append_data)
-
+
# checkout directory
shutil.rmtree(os.path.join(rw_repo.working_tree_dir, "lib"))
rval = index.checkout('lib')
assert len(list(rval)) > 1
-
+
def _count_existing(self, repo, files):
"""
Returns count of files that actually exist in the repository directory.
@@ -352,24 +351,24 @@ class TestIndex(TestBase):
# END for each deleted file
return existing
# END num existing helper
-
+
@with_rw_repo('0.1.6')
def test_index_mutation(self, rw_repo):
index = rw_repo.index
num_entries = len(index.entries)
cur_head = rw_repo.head
-
+
uname = "Some Developer"
umail = "sd@company.com"
rw_repo.config_writer().set_value("user", "name", uname)
- rw_repo.config_writer().set_value("user", "email", umail)
-
- # remove all of the files, provide a wild mix of paths, BaseIndexEntries,
+ rw_repo.config_writer().set_value("user", "email", umail)
+
+ # remove all of the files, provide a wild mix of paths, BaseIndexEntries,
# IndexEntries
def mixed_iterator():
count = 0
for entry in index.entries.itervalues():
- type_id = count % 4
+ type_id = count % 4
if type_id == 0: # path
yield entry.path
elif type_id == 1: # blob
@@ -381,39 +380,39 @@ class TestIndex(TestBase):
else:
raise AssertionError("Invalid Type")
count += 1
- # END for each entry
+ # END for each entry
# END mixed iterator
deleted_files = index.remove(mixed_iterator(), working_tree=False)
assert deleted_files
assert self._count_existing(rw_repo, deleted_files) == len(deleted_files)
assert len(index.entries) == 0
-
+
# reset the index to undo our changes
index.reset()
assert len(index.entries) == num_entries
-
+
# remove with working copy
deleted_files = index.remove(mixed_iterator(), working_tree=True)
assert deleted_files
assert self._count_existing(rw_repo, deleted_files) == 0
-
+
# reset everything
index.reset(working_tree=True)
assert self._count_existing(rw_repo, deleted_files) == len(deleted_files)
-
+
# invalid type
self.failUnlessRaises(TypeError, index.remove, [1])
-
+
# absolute path
- deleted_files = index.remove([os.path.join(rw_repo.working_tree_dir,"lib")], r=True)
+ deleted_files = index.remove([os.path.join(rw_repo.working_tree_dir, "lib")], r=True)
assert len(deleted_files) > 1
self.failUnlessRaises(ValueError, index.remove, ["/doesnt/exists"])
-
+
# TEST COMMITTING
# commit changed index
cur_commit = cur_head.commit
commit_message = "commit default head"
-
+
new_commit = index.commit(commit_message, head=False)
assert cur_commit != new_commit
assert new_commit.author.name == uname
@@ -424,74 +423,77 @@ class TestIndex(TestBase):
assert new_commit.parents[0] == cur_commit
assert len(new_commit.parents) == 1
assert cur_head.commit == cur_commit
-
+
# same index, no parents
commit_message = "index without parents"
commit_no_parents = index.commit(commit_message, parent_commits=list(), head=True)
assert commit_no_parents.message == commit_message
assert len(commit_no_parents.parents) == 0
assert cur_head.commit == commit_no_parents
-
+
# same index, multiple parents
commit_message = "Index with multiple parents\n commit with another line"
- commit_multi_parent = index.commit(commit_message,parent_commits=(commit_no_parents, new_commit))
+ commit_multi_parent = index.commit(commit_message, parent_commits=(commit_no_parents, new_commit))
assert commit_multi_parent.message == commit_message
assert len(commit_multi_parent.parents) == 2
assert commit_multi_parent.parents[0] == commit_no_parents
assert commit_multi_parent.parents[1] == new_commit
assert cur_head.commit == commit_multi_parent
-
+
# re-add all files in lib
# get the lib folder back on disk, but get an index without it
index.reset(new_commit.parents[0], working_tree=True).reset(new_commit, working_tree=False)
lib_file_path = os.path.join("lib", "git", "__init__.py")
assert (lib_file_path, 0) not in index.entries
assert os.path.isfile(os.path.join(rw_repo.working_tree_dir, lib_file_path))
-
+
# directory
entries = index.add(['lib'], fprogress=self._fprogress_add)
self._assert_entries(entries)
self._assert_fprogress(entries)
- assert len(entries)>1
-
- # glob
+ assert len(entries) > 1
+
+ # glob
entries = index.reset(new_commit).add([os.path.join('lib', 'git', '*.py')], fprogress=self._fprogress_add)
self._assert_entries(entries)
self._assert_fprogress(entries)
assert len(entries) == 14
-
- # same file
- entries = index.reset(new_commit).add([os.path.abspath(os.path.join('lib', 'git', 'head.py'))]*2, fprogress=self._fprogress_add)
+
+ # same file
+ entries = index.reset(new_commit).add(
+ [os.path.abspath(os.path.join('lib', 'git', 'head.py'))] * 2, fprogress=self._fprogress_add)
self._assert_entries(entries)
assert entries[0].mode & 0644 == 0644
# would fail, test is too primitive to handle this case
# self._assert_fprogress(entries)
self._reset_progress()
assert len(entries) == 2
-
+
# missing path
self.failUnlessRaises(OSError, index.reset(new_commit).add, ['doesnt/exist/must/raise'])
-
+
# blob from older revision overrides current index revision
old_blob = new_commit.parents[0].tree.blobs[0]
entries = index.reset(new_commit).add([old_blob], fprogress=self._fprogress_add)
self._assert_entries(entries)
self._assert_fprogress(entries)
- assert index.entries[(old_blob.path,0)].hexsha == old_blob.hexsha and len(entries) == 1
-
+ assert index.entries[(old_blob.path, 0)].hexsha == old_blob.hexsha and len(entries) == 1
+
# mode 0 not allowed
null_hex_sha = Diff.NULL_HEX_SHA
null_bin_sha = "\0" * 20
- self.failUnlessRaises(ValueError, index.reset(new_commit).add, [BaseIndexEntry((0, null_bin_sha,0,"doesntmatter"))])
-
+ self.failUnlessRaises(ValueError, index.reset(
+ new_commit).add, [BaseIndexEntry((0, null_bin_sha, 0, "doesntmatter"))])
+
# add new file
new_file_relapath = "my_new_file"
new_file_path = self._make_file(new_file_relapath, "hello world", rw_repo)
- entries = index.reset(new_commit).add([BaseIndexEntry((010644, null_bin_sha, 0, new_file_relapath))], fprogress=self._fprogress_add)
+ entries = index.reset(new_commit).add(
+ [BaseIndexEntry((010644, null_bin_sha, 0, new_file_relapath))], fprogress=self._fprogress_add)
self._assert_entries(entries)
self._assert_fprogress(entries)
assert len(entries) == 1 and entries[0].hexsha != null_hex_sha
-
+
# add symlink
if sys.platform != "win32":
basename = "my_real_symlink"
@@ -503,11 +505,11 @@ class TestIndex(TestBase):
self._assert_fprogress(entries)
assert len(entries) == 1 and S_ISLNK(entries[0].mode)
assert S_ISLNK(index.entries[index.entry_key("my_real_symlink", 0)].mode)
-
+
# we expect only the target to be written
assert index.repo.odb.stream(entries[0].binsha).read() == target
- # END real symlink test
-
+ # END real symlink test
+
# add fake symlink and assure it checks-our as symlink
fake_symlink_relapath = "my_fake_symlink"
link_target = "/etc/that"
@@ -518,83 +520,83 @@ class TestIndex(TestBase):
self._assert_fprogress(entries)
assert entries[0].hexsha != null_hex_sha
assert len(entries) == 1 and S_ISLNK(entries[0].mode)
-
+
# assure this also works with an alternate method
full_index_entry = IndexEntry.from_base(BaseIndexEntry((0120000, entries[0].binsha, 0, entries[0].path)))
entry_key = index.entry_key(full_index_entry)
index.reset(new_commit)
-
+
assert entry_key not in index.entries
index.entries[entry_key] = full_index_entry
index.write()
index.update() # force reread of entries
new_entry = index.entries[entry_key]
assert S_ISLNK(new_entry.mode)
-
+
# a tree created from this should contain the symlink
tree = index.write_tree()
assert fake_symlink_relapath in tree
index.write() # flush our changes for the checkout
-
+
# checkout the fakelink, should be a link then
assert not S_ISLNK(os.stat(fake_symlink_path)[ST_MODE])
os.remove(fake_symlink_path)
index.checkout(fake_symlink_path)
-
+
# on windows we will never get symlinks
if os.name == 'nt':
- # simlinks should contain the link as text ( which is what a
+ # simlinks should contain the link as text ( which is what a
# symlink actually is )
- open(fake_symlink_path,'rb').read() == link_target
+ open(fake_symlink_path, 'rb').read() == link_target
else:
assert S_ISLNK(os.lstat(fake_symlink_path)[ST_MODE])
-
+
# TEST RENAMING
def assert_mv_rval(rval):
for source, dest in rval:
assert not os.path.exists(source) and os.path.exists(dest)
# END for each renamed item
# END move assertion utility
-
+
self.failUnlessRaises(ValueError, index.move, ['just_one_path'])
# file onto existing file
files = ['AUTHORS', 'LICENSE']
self.failUnlessRaises(GitCommandError, index.move, files)
-
- # again, with force
+
+ # again, with force
assert_mv_rval(index.move(files, f=True))
-
+
# files into directory - dry run
paths = ['LICENSE', 'VERSION', 'doc']
rval = index.move(paths, dry_run=True)
assert len(rval) == 2
assert os.path.exists(paths[0])
-
+
# again, no dry run
rval = index.move(paths)
assert_mv_rval(rval)
-
+
# dir into dir
rval = index.move(['doc', 'test'])
assert_mv_rval(rval)
-
-
+
# TEST PATH REWRITING
######################
count = [0]
+
def rewriter(entry):
rval = str(count[0])
count[0] += 1
return rval
# END rewriter
-
+
def make_paths():
# two existing ones, one new one
yield 'CHANGES'
yield 'ez_setup.py'
yield index.entries[index.entry_key('README', 0)]
yield index.entries[index.entry_key('.gitignore', 0)]
-
+
for fid in range(3):
fname = 'newfile%i' % fid
open(fname, 'wb').write("abcd")
@@ -603,11 +605,10 @@ class TestIndex(TestBase):
# END path producer
paths = list(make_paths())
self._assert_entries(index.add(paths, path_rewriter=rewriter))
-
+
for filenum in range(len(paths)):
assert index.entry_key(str(filenum), 0) in index.entries
-
-
+
# TEST RESET ON PATHS
######################
arela = "aa"
@@ -619,34 +620,33 @@ class TestIndex(TestBase):
keys = (akey, bkey)
absfiles = (afile, bfile)
files = (arela, brela)
-
+
for fkey in keys:
assert not fkey in index.entries
-
+
index.add(files, write=True)
nc = index.commit("2 files committed", head=False)
-
+
for fkey in keys:
assert fkey in index.entries
-
+
# just the index
index.reset(paths=(arela, afile))
assert not akey in index.entries
assert bkey in index.entries
-
+
# now with working tree - files on disk as well as entries must be recreated
rw_repo.head.commit = nc
for absfile in absfiles:
os.remove(absfile)
-
+
index.reset(working_tree=True, paths=files)
-
- for fkey in keys:
+
+ for fkey in keys:
assert fkey in index.entries
for absfile in absfiles:
assert os.path.isfile(absfile)
-
-
+
@with_rw_repo('HEAD')
def test_compare_write_tree(self, rw_repo):
# write all trees and compare them
@@ -660,16 +660,14 @@ class TestIndex(TestBase):
index = rw_repo.index.reset(commit)
orig_tree = commit.tree
assert index.write_tree() == orig_tree
- # END for each commit
-
+ # END for each commit
+
def test_index_new(self):
B = self.rorepo.tree("6d9b1f4f9fa8c9f030e3207e7deacc5d5f8bba4e")
H = self.rorepo.tree("25dca42bac17d511b7e2ebdd9d1d679e7626db5f")
M = self.rorepo.tree("e746f96bcc29238b79118123028ca170adc4ff0f")
-
- for args in ((B,), (B,H), (B,H,M)):
+
+ for args in ((B,), (B, H), (B, H, M)):
index = IndexFile.new(self.rorepo, *args)
assert isinstance(index, IndexFile)
# END for each arg tuple
-
-
diff --git a/git/test/test_pack.py b/git/test/test_pack.py
index 1c308689..665a0226 100644
--- a/git/test/test_pack.py
+++ b/git/test/test_pack.py
@@ -4,23 +4,23 @@
# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
"""Test everything about packs reading and writing"""
from lib import (
- TestBase,
- with_rw_directory,
- with_packs_rw,
- fixture_path
- )
+ TestBase,
+ with_rw_directory,
+ with_packs_rw,
+ fixture_path
+)
from git.stream import DeltaApplyReader
from git.pack import (
- PackEntity,
- PackIndexFile,
- PackFile
- )
+ PackEntity,
+ PackIndexFile,
+ PackFile
+)
from git.base import (
- OInfo,
- OStream,
- )
+ OInfo,
+ OStream,
+)
from git.fun import delta_types
from git.exc import UnsupportedOperation
@@ -38,16 +38,17 @@ def bin_sha_from_filename(filename):
return to_bin_sha(os.path.splitext(os.path.basename(filename))[0][5:])
#} END utilities
+
class TestPack(TestBase):
-
+
packindexfile_v1 = (fixture_path('packs/pack-c0438c19fb16422b6bbcce24387b3264416d485b.idx'), 1, 67)
packindexfile_v2 = (fixture_path('packs/pack-11fdfa9e156ab73caae3b6da867192221f2089c2.idx'), 2, 30)
packindexfile_v2_3_ascii = (fixture_path('packs/pack-a2bf8e71d8c18879e499335762dd95119d93d9f1.idx'), 2, 42)
packfile_v2_1 = (fixture_path('packs/pack-c0438c19fb16422b6bbcce24387b3264416d485b.pack'), 2, packindexfile_v1[2])
packfile_v2_2 = (fixture_path('packs/pack-11fdfa9e156ab73caae3b6da867192221f2089c2.pack'), 2, packindexfile_v2[2])
- packfile_v2_3_ascii = (fixture_path('packs/pack-a2bf8e71d8c18879e499335762dd95119d93d9f1.pack'), 2, packindexfile_v2_3_ascii[2])
-
-
+ packfile_v2_3_ascii = (
+ fixture_path('packs/pack-a2bf8e71d8c18879e499335762dd95119d93d9f1.pack'), 2, packindexfile_v2_3_ascii[2])
+
def _assert_index_file(self, index, version, size):
assert index.packfile_checksum() != index.indexfile_checksum()
assert len(index.packfile_checksum()) == 20
@@ -55,102 +56,99 @@ class TestPack(TestBase):
assert index.version() == version
assert index.size() == size
assert len(index.offsets()) == size
-
+
# get all data of all objects
for oidx in xrange(index.size()):
sha = index.sha(oidx)
assert oidx == index.sha_to_index(sha)
-
+
entry = index.entry(oidx)
assert len(entry) == 3
-
+
assert entry[0] == index.offset(oidx)
assert entry[1] == sha
assert entry[2] == index.crc(oidx)
-
+
# verify partial sha
- for l in (4,8,11,17,20):
- assert index.partial_sha_to_index(sha[:l], l*2) == oidx
-
+ for l in (4, 8, 11, 17, 20):
+ assert index.partial_sha_to_index(sha[:l], l * 2) == oidx
+
# END for each object index in indexfile
self.failUnlessRaises(ValueError, index.partial_sha_to_index, "\0", 2)
-
-
+
def _assert_pack_file(self, pack, version, size):
assert pack.version() == 2
assert pack.size() == size
assert len(pack.checksum()) == 20
-
+
num_obj = 0
for obj in pack.stream_iter():
num_obj += 1
info = pack.info(obj.pack_offset)
stream = pack.stream(obj.pack_offset)
-
+
assert info.pack_offset == stream.pack_offset
assert info.type_id == stream.type_id
assert hasattr(stream, 'read')
-
+
# it should be possible to read from both streams
assert obj.read() == stream.read()
-
+
streams = pack.collect_streams(obj.pack_offset)
assert streams
-
+
# read the stream
try:
dstream = DeltaApplyReader.new(streams)
except ValueError:
- # ignore these, old git versions use only ref deltas,
+ # ignore these, old git versions use only ref deltas,
# which we havent resolved ( as we are without an index )
# Also ignore non-delta streams
continue
# END get deltastream
-
+
# read all
data = dstream.read()
assert len(data) == dstream.size
-
+
# test seek
dstream.seek(0)
assert dstream.read() == data
-
-
+
# read chunks
# NOTE: the current implementation is safe, it basically transfers
# all calls to the underlying memory map
-
+
# END for each object
assert num_obj == size
-
-
+
def test_pack_index(self):
# check version 1 and 2
- for indexfile, version, size in (self.packindexfile_v1, self.packindexfile_v2):
+ for indexfile, version, size in (self.packindexfile_v1, self.packindexfile_v2):
index = PackIndexFile(indexfile)
self._assert_index_file(index, version, size)
# END run tests
-
+
def test_pack(self):
- # there is this special version 3, but apparently its like 2 ...
+ # there is this special version 3, but apparently its like 2 ...
for packfile, version, size in (self.packfile_v2_3_ascii, self.packfile_v2_1, self.packfile_v2_2):
pack = PackFile(packfile)
self._assert_pack_file(pack, version, size)
# END for each pack to test
-
+
@with_rw_directory
def test_pack_entity(self, rw_dir):
pack_objs = list()
- for packinfo, indexinfo in ( (self.packfile_v2_1, self.packindexfile_v1),
- (self.packfile_v2_2, self.packindexfile_v2),
- (self.packfile_v2_3_ascii, self.packindexfile_v2_3_ascii)):
+ for packinfo, indexinfo in ((self.packfile_v2_1, self.packindexfile_v1),
+ (self.packfile_v2_2, self.packindexfile_v2),
+ (self.packfile_v2_3_ascii, self.packindexfile_v2_3_ascii)):
packfile, version, size = packinfo
indexfile, version, size = indexinfo
entity = PackEntity(packfile)
assert entity.pack().path() == packfile
assert entity.index().path() == indexfile
pack_objs.extend(entity.stream_iter())
-
+
count = 0
for info, stream in izip(entity.info_iter(), entity.stream_iter()):
count += 1
@@ -158,10 +156,10 @@ class TestPack(TestBase):
assert len(info.binsha) == 20
assert info.type_id == stream.type_id
assert info.size == stream.size
-
+
# we return fully resolved items, which is implied by the sha centric access
assert not info.type_id in delta_types
-
+
# try all calls
assert len(entity.collect_streams(info.binsha))
oinfo = entity.info(info.binsha)
@@ -170,7 +168,7 @@ class TestPack(TestBase):
ostream = entity.stream(info.binsha)
assert isinstance(ostream, OStream)
assert ostream.binsha is not None
-
+
# verify the stream
try:
assert entity.is_valid_stream(info.binsha, use_crc=True)
@@ -180,42 +178,43 @@ class TestPack(TestBase):
assert entity.is_valid_stream(info.binsha, use_crc=False)
# END for each info, stream tuple
assert count == size
-
+
# END for each entity
-
+
# pack writing - write all packs into one
# index path can be None
pack_path = tempfile.mktemp('', "pack", rw_dir)
index_path = tempfile.mktemp('', 'index', rw_dir)
iteration = 0
+
def rewind_streams():
- for obj in pack_objs:
+ for obj in pack_objs:
obj.stream.seek(0)
- #END utility
- for ppath, ipath, num_obj in zip((pack_path, )*2, (index_path, None), (len(pack_objs), None)):
+ # END utility
+ for ppath, ipath, num_obj in zip((pack_path, ) * 2, (index_path, None), (len(pack_objs), None)):
pfile = open(ppath, 'wb')
iwrite = None
if ipath:
ifile = open(ipath, 'wb')
iwrite = ifile.write
- #END handle ip
-
+ # END handle ip
+
# make sure we rewind the streams ... we work on the same objects over and over again
- if iteration > 0:
+ if iteration > 0:
rewind_streams()
- #END rewind streams
+ # END rewind streams
iteration += 1
-
+
pack_sha, index_sha = PackEntity.write_pack(pack_objs, pfile.write, iwrite, object_count=num_obj)
pfile.close()
assert os.path.getsize(ppath) > 100
-
+
# verify pack
pf = PackFile(ppath)
assert pf.size() == len(pack_objs)
assert pf.version() == PackFile.pack_version_default
assert pf.checksum() == pack_sha
-
+
# verify index
if ipath is not None:
ifile.close()
@@ -225,9 +224,9 @@ class TestPack(TestBase):
assert idx.packfile_checksum() == pack_sha
assert idx.indexfile_checksum() == index_sha
assert idx.size() == len(pack_objs)
- #END verify files exist
- #END for each packpath, indexpath pair
-
+ # END verify files exist
+ # END for each packpath, indexpath pair
+
# verify the packs throughly
rewind_streams()
entity = PackEntity.create(pack_objs, rw_dir)
@@ -237,11 +236,10 @@ class TestPack(TestBase):
for use_crc in range(2):
assert entity.is_valid_stream(info.binsha, use_crc)
# END for each crc mode
- #END for each info
+ # END for each info
assert count == len(pack_objs)
-
-
+
def test_pack_64(self):
# TODO: hex-edit a pack helping us to verify that we can handle 64 byte offsets
- # of course without really needing such a huge pack
+ # of course without really needing such a huge pack
raise SkipTest()
diff --git a/git/test/test_remote.py b/git/test/test_remote.py
index 87fcd7fe..18cfda07 100644
--- a/git/test/test_remote.py
+++ b/git/test/test_remote.py
@@ -5,21 +5,21 @@
# the BSD License: http://www.opensource.org/licenses/bsd-license.php
from git.test.lib import (
- TestBase,
- with_rw_and_rw_remote_repo,
- with_rw_repo,
- )
+ TestBase,
+ with_rw_and_rw_remote_repo,
+ with_rw_repo,
+)
from git.util import IterableList
from git.db.interface import PushInfo, FetchInfo, RemoteProgress
from git.remote import *
-from git.exc import GitCommandError
+from git.exc import GitCommandError
from git.refs import (
- Reference,
- TagReference,
- RemoteReference,
- Head,
- SymbolicReference
- )
+ Reference,
+ TagReference,
+ RemoteReference,
+ Head,
+ SymbolicReference
+)
from nose import SkipTest
@@ -28,54 +28,55 @@ import shutil
import os
import random
-# assure we have repeatable results
+# assure we have repeatable results
random.seed(0)
+
class TestRemoteProgress(RemoteProgress):
- __slots__ = ( "_seen_lines", "_stages_per_op", '_num_progress_messages')
+ __slots__ = ("_seen_lines", "_stages_per_op", '_num_progress_messages')
+
def __init__(self):
super(TestRemoteProgress, self).__init__()
self._seen_lines = list()
self._stages_per_op = dict()
self._seen_ops = set()
self._num_progress_messages = 0
-
+
def line_dropped(self, line):
try:
self._seen_lines.remove(line)
except ValueError:
pass
-
+
def __call__(self, message, input=''):
pass
-
+
def update(self, op_code, cur_count, max_count=None, message='', input=''):
# check each stage only comes once
if input:
self._seen_lines.append(input)
- #END handle input
+ # END handle input
op_id = op_code & self.OP_MASK
assert op_id in (self.COUNTING, self.COMPRESSING, self.WRITING)
-
+
self._stages_per_op.setdefault(op_id, 0)
- self._stages_per_op[ op_id ] = self._stages_per_op[ op_id ] | (op_code & self.STAGE_MASK)
-
- if op_code & (self.WRITING|self.END) == (self.WRITING|self.END):
+ self._stages_per_op[op_id] = self._stages_per_op[op_id] | (op_code & self.STAGE_MASK)
+
+ if op_code & (self.WRITING | self.END) == (self.WRITING | self.END):
assert message
# END check we get message
-
+
self._num_progress_messages += 1
-
-
+
def make_assertion(self):
# we don't always receive messages
if not self._seen_lines:
return
-
+
# sometimes objects are not compressed which is okay
- assert len(self._stages_per_op.keys()) in (2,3)
+ assert len(self._stages_per_op.keys()) in (2, 3)
assert self._stages_per_op
-
+
# must have seen all stages
for op, stages in self._stages_per_op.items():
assert stages & self.STAGE_MASK == self.STAGE_MASK
@@ -83,15 +84,14 @@ class TestRemoteProgress(RemoteProgress):
def assert_received_message(self):
assert self._num_progress_messages
-
+
class TestRemote(TestBase):
-
+
def _print_fetchhead(self, repo):
fp = open(os.path.join(repo.git_dir, "FETCH_HEAD"))
fp.close()
-
-
+
def _do_test_fetch_result(self, results, remote):
# self._print_fetchhead(remote.repo)
assert len(results) > 0 and isinstance(results[0], FetchInfo)
@@ -99,15 +99,15 @@ class TestRemote(TestBase):
assert isinstance(info.note, basestring)
if isinstance(info.ref, Reference):
assert info.flags != 0
- # END reference type flags handling
+ # END reference type flags handling
assert isinstance(info.ref, (SymbolicReference, Reference))
- if info.flags & (info.FORCED_UPDATE|info.FAST_FORWARD):
+ if info.flags & (info.FORCED_UPDATE | info.FAST_FORWARD):
assert isinstance(info.old_commit_binsha, str) and len(info.old_commit_binsha) == 20
else:
assert info.old_commit_binsha is None
- # END forced update checking
+ # END forced update checking
# END for each info
-
+
def _do_test_push_result(self, results, remote):
assert len(results) > 0 and isinstance(results[0], PushInfo)
for info in results:
@@ -123,24 +123,24 @@ class TestRemote(TestBase):
assert has_one
else:
# there must be a remote commit
- if info.flags & info.DELETED == 0:
+ if info.flags & info.DELETED == 0:
assert isinstance(info.local_ref, Reference)
else:
assert info.local_ref is None
assert type(info.remote_ref) in (TagReference, RemoteReference)
# END error checking
- # END for each info
-
+ # END for each info
+
def _commit_random_file(self, repo):
- #Create a file with a random name and random data and commit it to repo.
+ # Create a file with a random name and random data and commit it to repo.
# Return the commited absolute file path
index = repo.index
- new_file = self._make_file(os.path.basename(tempfile.mktemp()),str(random.random()), repo)
+ new_file = self._make_file(os.path.basename(tempfile.mktemp()), str(random.random()), repo)
index.add([new_file])
index.commit("Committing %s" % new_file)
return new_file
-
- def _do_test_fetch(self,remote, rw_repo, remote_repo):
+
+ def _do_test_fetch(self, remote, rw_repo, remote_repo):
def fetch_and_test(remote, **kwargs):
progress = TestRemoteProgress()
kwargs['progress'] = progress
@@ -149,84 +149,84 @@ class TestRemote(TestBase):
self._do_test_fetch_result(res, remote)
return res
# END fetch and check
-
+
def get_info(res, remote, name):
- return res["%s/%s"%(remote,name)]
-
+ return res["%s/%s" % (remote, name)]
+
# put remote head to master as it is garantueed to exist
remote_repo.head.reference = remote_repo.heads.master
-
+
res = fetch_and_test(remote)
# all uptodate
for info in res:
assert info.flags & info.HEAD_UPTODATE
-
+
# rewind remote head to trigger rejection
# index must be false as remote is a bare repo
rhead = remote_repo.head
remote_commit = rhead.commit
rhead.reset("HEAD~2", index=False)
res = fetch_and_test(remote)
- mkey = "%s/%s"%(remote,'master')
+ mkey = "%s/%s" % (remote, 'master')
master_info = res[mkey]
assert master_info.flags & FetchInfo.FORCED_UPDATE and master_info.note is not None
-
+
# normal fast forward - set head back to previous one
rhead.commit = remote_commit
res = fetch_and_test(remote)
assert res[mkey].flags & FetchInfo.FAST_FORWARD
-
+
# new remote branch
new_remote_branch = Head.create(remote_repo, "new_branch")
res = fetch_and_test(remote)
new_branch_info = get_info(res, remote, new_remote_branch)
assert new_branch_info.flags & FetchInfo.NEW_HEAD
-
+
# remote branch rename ( causes creation of a new one locally )
new_remote_branch.rename("other_branch_name")
res = fetch_and_test(remote)
other_branch_info = get_info(res, remote, new_remote_branch)
assert other_branch_info.ref.commit == new_branch_info.ref.commit
-
+
# remove new branch
Head.delete(new_remote_branch.repo, new_remote_branch)
res = fetch_and_test(remote)
# deleted remote will not be fetched
self.failUnlessRaises(IndexError, get_info, res, remote, new_remote_branch)
-
+
# prune stale tracking branches
stale_refs = remote.stale_refs
assert len(stale_refs) == 2 and isinstance(stale_refs[0], RemoteReference)
RemoteReference.delete(rw_repo, *stale_refs)
-
+
# test single branch fetch with refspec including target remote
- res = fetch_and_test(remote, refspec="master:refs/remotes/%s/master"%remote)
+ res = fetch_and_test(remote, refspec="master:refs/remotes/%s/master" % remote)
assert len(res) == 1 and get_info(res, remote, 'master')
-
+
# ... with respec and no target
res = fetch_and_test(remote, refspec='master')
assert len(res) == 1
-
+
# add new tag reference
rtag = TagReference.create(remote_repo, "1.0-RV_hello.there")
res = fetch_and_test(remote, tags=True)
tinfo = res[str(rtag)]
assert isinstance(tinfo.ref, TagReference) and tinfo.ref.commit == rtag.commit
assert tinfo.flags & tinfo.NEW_TAG
-
+
# adjust tag commit
Reference.set_object(rtag, rhead.commit.parents[0].parents[0])
res = fetch_and_test(remote, tags=True)
tinfo = res[str(rtag)]
assert tinfo.commit == rtag.commit
assert tinfo.flags & tinfo.TAG_UPDATE
-
+
# delete remote tag - local one will stay
TagReference.delete(remote_repo, rtag)
res = fetch_and_test(remote, tags=True)
self.failUnlessRaises(IndexError, get_info, res, remote, str(rtag))
-
- # provoke to receive actual objects to see what kind of output we have to
+
+ # provoke to receive actual objects to see what kind of output we have to
# expect. For that we need a remote transport protocol
# Create a new UN-shared repo and fetch into it after we pushed a change
# to the shared repo
@@ -234,31 +234,31 @@ class TestRemote(TestBase):
# must clone with a local path for the repo implementation not to freak out
# as it wants local paths only ( which I can understand )
other_repo = remote_repo.clone(other_repo_dir, shared=False)
- remote_repo_url = "git://localhost%s"%remote_repo.git_dir
-
+ remote_repo_url = "git://localhost%s" % remote_repo.git_dir
+
# put origin to git-url
- other_origin = other_repo.remotes.origin
+ other_origin = other_repo.remotes.origin
other_origin.config_writer.set("url", remote_repo_url)
# it automatically creates alternates as remote_repo is shared as well.
# It will use the transport though and ignore alternates when fetching
# assert not other_repo.alternates # this would fail
-
+
# assure we are in the right state
rw_repo.head.reset(remote.refs.master, working_tree=True)
try:
self._commit_random_file(rw_repo)
remote.push(rw_repo.head.reference)
-
- # here I would expect to see remote-information about packing
- # objects and so on. Unfortunately, this does not happen
+
+ # here I would expect to see remote-information about packing
+ # objects and so on. Unfortunately, this does not happen
# if we are redirecting the output - git explicitly checks for this
# and only provides progress information to ttys
res = fetch_and_test(other_origin)
finally:
shutil.rmtree(other_repo_dir)
# END test and cleanup
-
- def _verify_push_and_pull(self,remote, rw_repo, remote_repo):
+
+ def _verify_push_and_pull(self, remote, rw_repo, remote_repo):
# push our changes
lhead = rw_repo.head
lindex = rw_repo.index
@@ -266,16 +266,16 @@ class TestRemote(TestBase):
try:
lhead.reference = rw_repo.heads.master
except AttributeError:
- # if the author is on a non-master branch, the clones might not have
+ # if the author is on a non-master branch, the clones might not have
# a local master yet. We simply create it
lhead.reference = rw_repo.create_head('master')
- # END master handling
+ # END master handling
lhead.reset(remote.refs.master, working_tree=True)
-
+
# push without spec should fail ( without further configuration )
# well, works nicely
# self.failUnlessRaises(GitCommandError, remote.push)
-
+
# simple file push
self._commit_random_file(rw_repo)
progress = TestRemoteProgress()
@@ -283,25 +283,25 @@ class TestRemote(TestBase):
assert isinstance(res, IterableList)
self._do_test_push_result(res, remote)
progress.make_assertion()
-
+
# rejected - undo last commit
lhead.reset("HEAD~1")
res = remote.push(lhead.reference)
- assert res[0].flags & PushInfo.ERROR
+ assert res[0].flags & PushInfo.ERROR
assert res[0].flags & PushInfo.REJECTED
self._do_test_push_result(res, remote)
-
+
# force rejected pull
res = remote.push('+%s' % lhead.reference)
- assert res[0].flags & PushInfo.ERROR == 0
+ assert res[0].flags & PushInfo.ERROR == 0
assert res[0].flags & PushInfo.FORCED_UPDATE
self._do_test_push_result(res, remote)
-
+
# invalid refspec
res = remote.push("hellothere")
assert len(res) == 0
-
- # push new tags
+
+ # push new tags
progress = TestRemoteProgress()
to_be_updated = "my_tag.1.0RV"
new_tag = TagReference.create(rw_repo, to_be_updated)
@@ -310,26 +310,26 @@ class TestRemote(TestBase):
assert res[-1].flags & PushInfo.NEW_TAG
progress.make_assertion()
self._do_test_push_result(res, remote)
-
+
# update push new tags
# Rejection is default
new_tag = TagReference.create(rw_repo, to_be_updated, ref='HEAD~1', force=True)
res = remote.push(tags=True)
self._do_test_push_result(res, remote)
assert res[-1].flags & PushInfo.REJECTED and res[-1].flags & PushInfo.ERROR
-
+
# push force this tag
res = remote.push("+%s" % new_tag.path)
assert res[-1].flags & PushInfo.ERROR == 0 and res[-1].flags & PushInfo.FORCED_UPDATE
-
+
# delete tag - have to do it using refspec
res = remote.push(":%s" % new_tag.path)
self._do_test_push_result(res, remote)
assert res[0].flags & PushInfo.DELETED
- # Currently progress is not properly transferred, especially not using
+ # Currently progress is not properly transferred, especially not using
# the git daemon
# progress.assert_received_message()
-
+
# push new branch
new_head = Head.create(rw_repo, "my_new_branch")
progress = TestRemoteProgress()
@@ -337,20 +337,20 @@ class TestRemote(TestBase):
assert res[0].flags & PushInfo.NEW_HEAD
progress.make_assertion()
self._do_test_push_result(res, remote)
-
+
# delete new branch on the remote end and locally
res = remote.push(":%s" % new_head.path)
self._do_test_push_result(res, remote)
Head.delete(rw_repo, new_head)
assert res[-1].flags & PushInfo.DELETED
-
+
# --all
res = remote.push(all=True)
self._do_test_push_result(res, remote)
-
+
remote.pull('master')
-
- # cleanup - delete created tags and branches as we are in an innerloop on
+
+ # cleanup - delete created tags and branches as we are in an innerloop on
# the same repository
TagReference.delete(rw_repo, new_tag, other_tag)
remote.push(":%s" % other_tag.path)
@@ -359,29 +359,28 @@ class TestRemote(TestBase):
# If you see this, plesase remind yourself, that all this needs to be run
# per repository type !
raise SkipTest("todo")
-
@with_rw_and_rw_remote_repo('0.1.6')
def test_base(self, rw_repo, remote_repo):
num_remotes = 0
remote_set = set()
ran_fetch_test = False
-
+
for remote in rw_repo.remotes:
num_remotes += 1
assert remote == remote
assert str(remote) != repr(remote)
remote_set.add(remote)
remote_set.add(remote) # should already exist
-
- # REFS
+
+ # REFS
refs = remote.refs
assert refs
for ref in refs:
assert ref.remote_name == remote.name
assert ref.remote_head
# END for each ref
-
+
# OPTIONS
# cannot use 'fetch' key anymore as it is now a method
for opt in ("url", ):
@@ -389,10 +388,10 @@ class TestRemote(TestBase):
reader = remote.config_reader
assert reader.get(opt) == val
assert reader.get_value(opt, None) == val
-
+
# unable to write with a reader
self.failUnlessRaises(IOError, reader.set, opt, "test")
-
+
# change value
writer = remote.config_writer
new_val = "myval"
@@ -402,9 +401,9 @@ class TestRemote(TestBase):
assert writer.get(opt) == val
del(writer)
assert getattr(remote, opt) == val
- # END for each default option key
-
- # RENAME
+ # END for each default option key
+
+ # RENAME
other_name = "totally_other_name"
prev_name = remote.name
assert remote.rename(other_name) == remote
@@ -413,46 +412,43 @@ class TestRemote(TestBase):
for time in range(2):
assert remote.rename(prev_name).name == prev_name
# END for each rename ( back to prev_name )
-
+
# PUSH/PULL TESTING
self._verify_push_and_pull(remote, rw_repo, remote_repo)
-
+
# FETCH TESTING
- # Only for remotes - local cases are the same or less complicated
+ # Only for remotes - local cases are the same or less complicated
# as additional progress information will never be emitted
if remote.name == "daemon_origin":
self._do_test_fetch(remote, rw_repo, remote_repo)
ran_fetch_test = True
- # END fetch test
-
+ # END fetch test
+
remote.update()
# END for each remote
-
+
assert ran_fetch_test
assert num_remotes
assert num_remotes == len(remote_set)
-
+
origin = rw_repo.remote('origin')
assert origin == rw_repo.remotes.origin
-
+
@with_rw_repo('HEAD', bare=True)
def test_creation_and_removal(self, bare_rw_repo):
new_name = "test_new_one"
arg_list = (new_name, "git@server:hello.git")
- remote = Remote.create(bare_rw_repo, *arg_list )
+ remote = Remote.create(bare_rw_repo, *arg_list)
assert remote.name == "test_new_one"
assert remote in bare_rw_repo.remotes
-
+
# create same one again
self.failUnlessRaises(GitCommandError, Remote.create, bare_rw_repo, *arg_list)
-
+
Remote.remove(bare_rw_repo, new_name)
-
+
for remote in bare_rw_repo.remotes:
if remote.name == new_name:
raise AssertionError("Remote removal failed")
# END if deleted remote matches existing remote's name
# END for each remote
-
-
-
diff --git a/git/test/test_stats.py b/git/test/test_stats.py
index 5210e7bc..c498cfa4 100644
--- a/git/test/test_stats.py
+++ b/git/test/test_stats.py
@@ -5,25 +5,26 @@
# the BSD License: http://www.opensource.org/licenses/bsd-license.php
from git.test.lib import (
- TestBase,
- fixture,
- assert_equal
- )
+ TestBase,
+ fixture,
+ assert_equal
+)
from git.util import Stats
+
class TestStats(TestBase):
-
+
def test__list_from_string(self):
output = fixture('diff_numstat')
stats = Stats._list_from_string(self.rorepo, output)
-
+
assert_equal(2, stats.total['files'])
assert_equal(52, stats.total['lines'])
assert_equal(29, stats.total['insertions'])
assert_equal(23, stats.total['deletions'])
-
+
assert_equal(29, stats.files["a.txt"]['insertions'])
assert_equal(18, stats.files["a.txt"]['deletions'])
-
+
assert_equal(0, stats.files["b.txt"]['insertions'])
assert_equal(5, stats.files["b.txt"]['deletions'])
diff --git a/git/test/test_stream.py b/git/test/test_stream.py
index 7af652b7..508038d7 100644
--- a/git/test/test_stream.py
+++ b/git/test/test_stream.py
@@ -4,24 +4,24 @@
# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
"""Test for object db"""
from lib import (
- TestBase,
- DummyStream,
- Sha1Writer,
- make_bytes,
- make_object,
- fixture_path
- )
+ TestBase,
+ DummyStream,
+ Sha1Writer,
+ make_bytes,
+ make_object,
+ fixture_path
+)
from git.stream import *
from git.util import (
NULL_HEX_SHA,
hex_to_bin
- )
+)
from git.util import zlib
from git.typ import (
str_blob_type
- )
+)
from git.db.py.loose import PureLooseObjectODB
import time
@@ -29,13 +29,12 @@ import tempfile
import os
-
-
class TestStream(TestBase):
+
"""Test stream classes"""
-
- data_sizes = (15, 10000, 1000*1024+512)
-
+
+ data_sizes = (15, 10000, 1000 * 1024 + 512)
+
def _assert_stream_reader(self, stream, cdata, rewind_stream=lambda s: None):
"""Make stream tests - the orig_stream is seekable, allowing it to be
rewound and reused
@@ -43,43 +42,43 @@ class TestStream(TestBase):
:param rewind_stream: function called to rewind the stream to make it ready
for reuse"""
ns = 10
- assert len(cdata) > ns-1, "Data must be larger than %i, was %i" % (ns, len(cdata))
-
+ assert len(cdata) > ns - 1, "Data must be larger than %i, was %i" % (ns, len(cdata))
+
# read in small steps
ss = len(cdata) / ns
for i in range(ns):
data = stream.read(ss)
- chunk = cdata[i*ss:(i+1)*ss]
+ chunk = cdata[i * ss:(i + 1) * ss]
assert data == chunk
# END for each step
rest = stream.read()
if rest:
assert rest == cdata[-len(rest):]
# END handle rest
-
+
if isinstance(stream, DecompressMemMapReader):
assert len(stream.data()) == stream.compressed_bytes_read()
# END handle special type
-
+
rewind_stream(stream)
-
+
# read everything
rdata = stream.read()
assert rdata == cdata
-
+
if isinstance(stream, DecompressMemMapReader):
assert len(stream.data()) == stream.compressed_bytes_read()
# END handle special type
-
+
def test_decompress_reader(self):
for close_on_deletion in range(2):
for with_size in range(2):
for ds in self.data_sizes:
cdata = make_bytes(ds, randomize=False)
-
+
# zdata = zipped actual data
# cdata = original content data
-
+
# create reader
if with_size:
# need object data
@@ -87,7 +86,7 @@ class TestStream(TestBase):
type, size, reader = DecompressMemMapReader.new(zdata, close_on_deletion)
assert size == len(cdata)
assert type == str_blob_type
-
+
# even if we don't set the size, it will be set automatically on first read
test_reader = DecompressMemMapReader(zdata, close_on_deletion=False)
assert test_reader._s == len(cdata)
@@ -96,60 +95,59 @@ class TestStream(TestBase):
zdata = zlib.compress(cdata)
reader = DecompressMemMapReader(zdata, close_on_deletion, len(cdata))
assert reader._s == len(cdata)
- # END get reader
-
+ # END get reader
+
self._assert_stream_reader(reader, cdata, lambda r: r.seek(0))
-
+
# put in a dummy stream for closing
dummy = DummyStream()
reader._m = dummy
-
+
assert not dummy.closed
del(reader)
assert dummy.closed == close_on_deletion
# END for each datasize
# END whether size should be used
# END whether stream should be closed when deleted
-
+
def test_sha_writer(self):
writer = Sha1Writer()
assert 2 == writer.write("hi")
assert len(writer.sha(as_hex=1)) == 40
assert len(writer.sha(as_hex=0)) == 20
-
+
# make sure it does something ;)
prev_sha = writer.sha()
writer.write("hi again")
assert writer.sha() != prev_sha
-
+
def test_compressed_writer(self):
for ds in self.data_sizes:
fd, path = tempfile.mkstemp()
ostream = FDCompressedSha1Writer(fd)
data = make_bytes(ds, randomize=False)
-
+
# for now, just a single write, code doesn't care about chunking
assert len(data) == ostream.write(data)
ostream.close()
-
+
# its closed already
self.failUnlessRaises(OSError, os.close, fd)
-
+
# read everything back, compare to data we zip
- fd = os.open(path, os.O_RDONLY|getattr(os, 'O_BINARY', 0))
+ fd = os.open(path, os.O_RDONLY | getattr(os, 'O_BINARY', 0))
written_data = os.read(fd, os.path.getsize(path))
assert len(written_data) == os.path.getsize(path)
os.close(fd)
assert written_data == zlib.compress(data, 1) # best speed
-
+
os.remove(path)
# END for each os
-
+
def test_decompress_reader_special_case(self):
odb = PureLooseObjectODB(fixture_path('objects'))
ostream = odb.stream(hex_to_bin('7bb839852ed5e3a069966281bb08d50012fb309b'))
-
+
# if there is a bug, we will be missing one byte exactly !
data = ostream.read()
assert len(data) == ostream.size
-
diff --git a/git/test/test_util.py b/git/test/test_util.py
index d2ca8bf2..66103d4c 100644
--- a/git/test/test_util.py
+++ b/git/test/test_util.py
@@ -16,69 +16,70 @@ from git.cmd import dashify
import time
from git.util import (
- to_hex_sha,
- to_bin_sha,
- NULL_HEX_SHA,
- LockedFD,
+ to_hex_sha,
+ to_bin_sha,
+ NULL_HEX_SHA,
+ LockedFD,
Actor,
IterableList
- )
+)
class TestIterableMember(object):
+
"""A member of an iterable list"""
__slots__ = ("name", "prefix_name")
-
+
def __init__(self, name):
self.name = name
self.prefix_name = name
-
+
class TestUtils(TestBase):
+
def setup(self):
self.testdict = {
"string": "42",
"int": 42,
- "array": [ 42 ],
+ "array": [42],
}
def test_it_should_dashify(self):
assert 'this-is-my-argument' == dashify('this_is_my_argument')
assert 'foo' == dashify('foo')
-
-
+
def test_lock_file(self):
my_file = tempfile.mktemp()
lock_file = LockFile(my_file)
assert not lock_file._has_lock()
# release lock we don't have - fine
lock_file._release_lock()
-
+
# get lock
lock_file._obtain_lock_or_raise()
assert lock_file._has_lock()
-
+
# concurrent access
other_lock_file = LockFile(my_file)
assert not other_lock_file._has_lock()
self.failUnlessRaises(IOError, other_lock_file._obtain_lock_or_raise)
-
+
lock_file._release_lock()
assert not lock_file._has_lock()
-
+
other_lock_file._obtain_lock_or_raise()
self.failUnlessRaises(IOError, lock_file._obtain_lock_or_raise)
-
+
# auto-release on destruction
del(other_lock_file)
lock_file._obtain_lock_or_raise()
lock_file._release_lock()
-
+
def test_blocking_lock_file(self):
my_file = tempfile.mktemp()
lock_file = BlockingLockFile(my_file)
lock_file._obtain_lock()
-
+
# next one waits for the lock
start = time.time()
wait_time = 0.1
@@ -86,10 +87,10 @@ class TestUtils(TestBase):
self.failUnlessRaises(IOError, wait_lock._obtain_lock)
elapsed = time.time() - start
assert elapsed <= wait_time + 0.02 # some extra time it may cost
-
+
def test_user_id(self):
assert '@' in get_user_id()
-
+
def test_parse_date(self):
# test all supported formats
def assert_rval(rval, veri_time, offset=0):
@@ -97,13 +98,13 @@ class TestUtils(TestBase):
assert isinstance(rval[0], int) and isinstance(rval[1], int)
assert rval[0] == veri_time
assert rval[1] == offset
-
+
# now that we are here, test our conversion functions as well
utctz = altz_to_utctz_str(offset)
assert isinstance(utctz, basestring)
assert utctz_to_altz(verify_utctz(utctz)) == offset
# END assert rval utility
-
+
rfc = ("Thu, 07 Apr 2005 22:13:11 +0000", 0)
iso = ("2005-04-07T22:13:11 -0200", 7200)
iso2 = ("2005-04-07 22:13:11 +0400", -14400)
@@ -114,32 +115,32 @@ class TestUtils(TestBase):
for date, offset in (rfc, iso, iso2, iso3, alt, alt2):
assert_rval(parse_date(date), veri_time, offset)
# END for each date type
-
+
# and failure
self.failUnlessRaises(ValueError, parse_date, 'invalid format')
self.failUnlessRaises(ValueError, parse_date, '123456789 -02000')
self.failUnlessRaises(ValueError, parse_date, ' 123456789 -0200')
-
+
def test_actor(self):
for cr in (None, self.rorepo.config_reader()):
assert isinstance(Actor.committer(cr), Actor)
assert isinstance(Actor.author(cr), Actor)
- #END assure config reader is handled
-
+ # END assure config reader is handled
+
def test_basics(self):
assert to_hex_sha(NULL_HEX_SHA) == NULL_HEX_SHA
assert len(to_bin_sha(NULL_HEX_SHA)) == 20
assert to_hex_sha(to_bin_sha(NULL_HEX_SHA)) == NULL_HEX_SHA
-
+
def _cmp_contents(self, file_path, data):
- # raise if data from file at file_path
+ # raise if data from file at file_path
# does not match data string
fp = open(file_path, "rb")
try:
assert fp.read() == data
finally:
fp.close()
-
+
def test_lockedfd(self):
my_file = tempfile.mktemp()
orig_data = "hello"
@@ -147,43 +148,42 @@ class TestUtils(TestBase):
my_file_fp = open(my_file, "wb")
my_file_fp.write(orig_data)
my_file_fp.close()
-
+
try:
lfd = LockedFD(my_file)
- lockfilepath = lfd._lockfilepath()
-
+ lockfilepath = lfd._lockfilepath()
+
# cannot end before it was started
self.failUnlessRaises(AssertionError, lfd.rollback)
self.failUnlessRaises(AssertionError, lfd.commit)
-
+
# open for writing
assert not os.path.isfile(lockfilepath)
wfd = lfd.open(write=True)
assert lfd._fd is wfd
assert os.path.isfile(lockfilepath)
-
+
# write data and fail
os.write(wfd, new_data)
lfd.rollback()
assert lfd._fd is None
self._cmp_contents(my_file, orig_data)
assert not os.path.isfile(lockfilepath)
-
+
# additional call doesnt fail
lfd.commit()
lfd.rollback()
-
+
# test reading
lfd = LockedFD(my_file)
rfd = lfd.open(write=False)
assert os.read(rfd, len(orig_data)) == orig_data
-
+
assert os.path.isfile(lockfilepath)
# deletion rolls back
del(lfd)
assert not os.path.isfile(lockfilepath)
-
-
+
# write data - concurrently
lfd = LockedFD(my_file)
olfd = LockedFD(my_file)
@@ -192,17 +192,17 @@ class TestUtils(TestBase):
assert os.path.isfile(lockfilepath)
# another one fails
self.failUnlessRaises(IOError, olfd.open)
-
+
wfdstream.write(new_data)
lfd.commit()
assert not os.path.isfile(lockfilepath)
self._cmp_contents(my_file, new_data)
-
+
# could test automatic _end_writing on destruction
finally:
os.remove(my_file)
# END final cleanup
-
+
# try non-existing file for reading
lfd = LockedFD(tempfile.mktemp())
try:
@@ -216,37 +216,37 @@ class TestUtils(TestBase):
def test_iterable_list(self):
for args in (('name',), ('name', 'prefix_')):
l = IterableList('name')
-
+
m1 = TestIterableMember('one')
m2 = TestIterableMember('two')
-
+
l.extend((m1, m2))
-
+
assert len(l) == 2
-
+
# contains works with name and identity
assert m1.name in l
assert m2.name in l
assert m2 in l
assert m2 in l
assert 'invalid' not in l
-
+
# with string index
assert l[m1.name] is m1
assert l[m2.name] is m2
-
+
# with int index
assert l[0] is m1
assert l[1] is m2
-
+
# with getattr
assert l.one is m1
assert l.two is m2
-
+
# test exceptions
self.failUnlessRaises(AttributeError, getattr, l, 'something')
self.failUnlessRaises(IndexError, l.__getitem__, 'something')
-
+
# delete by name and index
self.failUnlessRaises(IndexError, l.__delitem__, 'something')
del(l[m2.name])
@@ -255,21 +255,22 @@ class TestUtils(TestBase):
del(l[0])
assert m1.name not in l
assert len(l) == 0
-
+
self.failUnlessRaises(IndexError, l.__delitem__, 0)
self.failUnlessRaises(IndexError, l.__delitem__, 'something')
- #END for each possible mode
-
+ # END for each possible mode
+
class TestActor(TestBase):
+
def test_from_string_should_separate_name_and_email(self):
a = Actor._from_string("Michael Trier <mtrier@example.com>")
assert "Michael Trier" == a.name
assert "mtrier@example.com" == a.email
-
+
# base type capabilities
assert a == a
- assert not ( a != a )
+ assert not (a != a)
m = set()
m.add(a)
m.add(a)