diff options
Diffstat (limited to 'gitdb/test/db')
| -rw-r--r-- | gitdb/test/db/lib.py | 149 | ||||
| -rw-r--r-- | gitdb/test/db/test_git.py | 33 | ||||
| -rw-r--r-- | gitdb/test/db/test_loose.py | 21 | ||||
| -rw-r--r-- | gitdb/test/db/test_mem.py | 23 | ||||
| -rw-r--r-- | gitdb/test/db/test_pack.py | 41 | ||||
| -rw-r--r-- | gitdb/test/db/test_ref.py | 35 |
6 files changed, 117 insertions, 185 deletions
diff --git a/gitdb/test/db/lib.py b/gitdb/test/db/lib.py index 62614ee..af6d9e0 100644 --- a/gitdb/test/db/lib.py +++ b/gitdb/test/db/lib.py @@ -6,63 +6,66 @@ from gitdb.test.lib import ( with_rw_directory, with_packs_rw, - ZippedStoreShaWriter, fixture_path, TestBase - ) +) -from gitdb.stream import Sha1Writer +from gitdb.stream import ( + Sha1Writer, + ZippedStoreShaWriter +) from gitdb.base import ( - IStream, - OStream, - OInfo - ) - + IStream, + OStream, + OInfo +) + from gitdb.exc import BadObject from gitdb.typ import str_blob_type +from gitdb.utils.compat import xrange + +from io import BytesIO -from async import IteratorReader -from cStringIO import StringIO from struct import pack __all__ = ('TestDBBase', 'with_rw_directory', 'with_packs_rw', 'fixture_path') - + class TestDBBase(TestBase): """Base class providing testing routines on databases""" - + # data - two_lines = "1234\nhello world" + two_lines = b'1234\nhello world' all_data = (two_lines, ) - + def _assert_object_writing_simple(self, db): # write a bunch of objects and query their streams and info null_objs = db.size() ni = 250 for i in xrange(ni): data = pack(">L", i) - istream = IStream(str_blob_type, len(data), StringIO(data)) + istream = IStream(str_blob_type, len(data), BytesIO(data)) new_istream = db.store(istream) assert new_istream is istream assert db.has_object(istream.binsha) - + info = db.info(istream.binsha) assert isinstance(info, OInfo) assert info.type == istream.type and info.size == istream.size - + stream = db.stream(istream.binsha) assert isinstance(stream, OStream) assert stream.binsha == info.binsha and stream.type == info.type assert stream.read() == data # END for each item - + assert db.size() == null_objs + ni shas = list(db.sha_iter()) assert len(shas) == db.size() assert len(shas[0]) == 20 - - + + def _assert_object_writing(self, db): """General tests to verify object writing, compatible to ObjectDBW **Note:** requires write access to the database""" @@ -76,25 +79,24 @@ class TestDBBase(TestBase): ostream = ostreamcls() assert isinstance(ostream, Sha1Writer) # END create ostream - + prev_ostream = db.set_ostream(ostream) - assert type(prev_ostream) in ostreams or prev_ostream in ostreams - - istream = IStream(str_blob_type, len(data), StringIO(data)) - + assert type(prev_ostream) in ostreams or prev_ostream in ostreams + istream = IStream(str_blob_type, len(data), BytesIO(data)) + # store returns same istream instance, with new sha set my_istream = db.store(istream) sha = istream.binsha assert my_istream is istream assert db.has_object(sha) != dry_run - assert len(sha) == 20 - + assert len(sha) == 20 + # verify data - the slow way, we want to run code if not dry_run: info = db.info(sha) assert str_blob_type == info.type assert info.size == len(data) - + ostream = db.stream(sha) assert ostream.read() == data assert ostream.type == str_blob_type @@ -102,107 +104,26 @@ class TestDBBase(TestBase): else: self.failUnlessRaises(BadObject, db.info, sha) self.failUnlessRaises(BadObject, db.stream, sha) - + # DIRECT STREAM COPY # our data hase been written in object format to the StringIO # we pasesd as output stream. No physical database representation # was created. - # Test direct stream copy of object streams, the result must be + # Test direct stream copy of object streams, the result must be # identical to what we fed in ostream.seek(0) istream.stream = ostream assert istream.binsha is not None prev_sha = istream.binsha - + db.set_ostream(ZippedStoreShaWriter()) db.store(istream) assert istream.binsha == prev_sha new_ostream = db.ostream() - + # note: only works as long our store write uses the same compression # level, which is zip_best assert ostream.getvalue() == new_ostream.getvalue() # END for each data set # END for each dry_run mode - - def _assert_object_writing_async(self, db): - """Test generic object writing using asynchronous access""" - ni = 5000 - def istream_generator(offset=0, ni=ni): - for data_src in xrange(ni): - data = str(data_src + offset) - yield IStream(str_blob_type, len(data), StringIO(data)) - # END for each item - # END generator utility - - # for now, we are very trusty here as we expect it to work if it worked - # in the single-stream case - - # write objects - reader = IteratorReader(istream_generator()) - istream_reader = db.store_async(reader) - istreams = istream_reader.read() # read all - assert istream_reader.task().error() is None - assert len(istreams) == ni - - for stream in istreams: - assert stream.error is None - assert len(stream.binsha) == 20 - assert isinstance(stream, IStream) - # END assert each stream - - # test has-object-async - we must have all previously added ones - reader = IteratorReader( istream.binsha for istream in istreams ) - hasobject_reader = db.has_object_async(reader) - count = 0 - for sha, has_object in hasobject_reader: - assert has_object - count += 1 - # END for each sha - assert count == ni - - # read the objects we have just written - reader = IteratorReader( istream.binsha for istream in istreams ) - ostream_reader = db.stream_async(reader) - - # read items individually to prevent hitting possible sys-limits - count = 0 - for ostream in ostream_reader: - assert isinstance(ostream, OStream) - count += 1 - # END for each ostream - assert ostream_reader.task().error() is None - assert count == ni - - # get info about our items - reader = IteratorReader( istream.binsha for istream in istreams ) - info_reader = db.info_async(reader) - - count = 0 - for oinfo in info_reader: - assert isinstance(oinfo, OInfo) - count += 1 - # END for each oinfo instance - assert count == ni - - - # combined read-write using a converter - # add 2500 items, and obtain their output streams - nni = 2500 - reader = IteratorReader(istream_generator(offset=ni, ni=nni)) - istream_to_sha = lambda istreams: [ istream.binsha for istream in istreams ] - - istream_reader = db.store_async(reader) - istream_reader.set_post_cb(istream_to_sha) - - ostream_reader = db.stream_async(istream_reader) - - count = 0 - # read it individually, otherwise we might run into the ulimit - for ostream in ostream_reader: - assert isinstance(ostream, OStream) - count += 1 - # END for each ostream - assert count == nni - - + diff --git a/gitdb/test/db/test_git.py b/gitdb/test/db/test_git.py index 1ef577a..e141c2b 100644 --- a/gitdb/test/db/test_git.py +++ b/gitdb/test/db/test_git.py @@ -2,46 +2,51 @@ # # This module is part of GitDB and is released under # the New BSD License: http://www.opensource.org/licenses/bsd-license.php -from lib import * +from gitdb.test.db.lib import ( + TestDBBase, + fixture_path, + with_rw_directory +) from gitdb.exc import BadObject from gitdb.db import GitDB from gitdb.base import OStream, OInfo from gitdb.util import hex_to_bin, bin_to_hex - + class TestGitDB(TestDBBase): - + def test_reading(self): gdb = GitDB(fixture_path('../../../.git/objects')) - + # we have packs and loose objects, alternates doesn't necessarily exist assert 1 < len(gdb.databases()) < 4 - + # access should be possible gitdb_sha = hex_to_bin("5690fd0d3304f378754b23b098bd7cb5f4aa1976") assert isinstance(gdb.info(gitdb_sha), OInfo) assert isinstance(gdb.stream(gitdb_sha), OStream) - assert gdb.size() > 200 + ni = 50 + assert gdb.size() >= ni sha_list = list(gdb.sha_iter()) assert len(sha_list) == gdb.size() - - - # This is actually a test for compound functionality, but it doesn't + sha_list = sha_list[:ni] # speed up tests ... + + + # This is actually a test for compound functionality, but it doesn't # have a separate test module # test partial shas # this one as uneven and quite short assert gdb.partial_to_complete_sha_hex('155b6') == hex_to_bin("155b62a9af0aa7677078331e111d0f7aa6eb4afc") - + # mix even/uneven hexshas for i, binsha in enumerate(sha_list): assert gdb.partial_to_complete_sha_hex(bin_to_hex(binsha)[:8-(i%2)]) == binsha # END for each sha - + self.failUnlessRaises(BadObject, gdb.partial_to_complete_sha_hex, "0000") - + @with_rw_directory def test_writing(self, path): gdb = GitDB(path) - + # its possible to write objects self._assert_object_writing(gdb) - self._assert_object_writing_async(gdb) diff --git a/gitdb/test/db/test_loose.py b/gitdb/test/db/test_loose.py index d7e1d01..1d6af9c 100644 --- a/gitdb/test/db/test_loose.py +++ b/gitdb/test/db/test_loose.py @@ -2,33 +2,34 @@ # # This module is part of GitDB and is released under # the New BSD License: http://www.opensource.org/licenses/bsd-license.php -from lib import * +from gitdb.test.db.lib import ( + TestDBBase, + with_rw_directory +) from gitdb.db import LooseObjectDB from gitdb.exc import BadObject from gitdb.util import bin_to_hex - + class TestLooseDB(TestDBBase): - + @with_rw_directory def test_basics(self, path): ldb = LooseObjectDB(path) - + # write data self._assert_object_writing(ldb) - self._assert_object_writing_async(ldb) - + # verify sha iteration and size shas = list(ldb.sha_iter()) assert shas and len(shas[0]) == 20 - + assert len(shas) == ldb.size() - + # verify find short object long_sha = bin_to_hex(shas[-1]) for short_sha in (long_sha[:20], long_sha[:5]): assert bin_to_hex(ldb.partial_to_complete_sha_hex(short_sha)) == long_sha # END for each sha - + self.failUnlessRaises(BadObject, ldb.partial_to_complete_sha_hex, '0000') # raises if no object could be foudn - diff --git a/gitdb/test/db/test_mem.py b/gitdb/test/db/test_mem.py index df428e2..97f7217 100644 --- a/gitdb/test/db/test_mem.py +++ b/gitdb/test/db/test_mem.py @@ -2,29 +2,32 @@ # # This module is part of GitDB and is released under # the New BSD License: http://www.opensource.org/licenses/bsd-license.php -from lib import * +from gitdb.test.db.lib import ( + TestDBBase, + with_rw_directory +) from gitdb.db import ( - MemoryDB, - LooseObjectDB - ) - + MemoryDB, + LooseObjectDB +) + class TestMemoryDB(TestDBBase): - + @with_rw_directory def test_writing(self, path): mdb = MemoryDB() - + # write data self._assert_object_writing_simple(mdb) - + # test stream copy ldb = LooseObjectDB(path) assert ldb.size() == 0 num_streams_copied = mdb.stream_copy(mdb.sha_iter(), ldb) assert num_streams_copied == mdb.size() - + assert ldb.size() == mdb.size() for sha in mdb.sha_iter(): assert ldb.has_object(sha) - assert ldb.stream(sha).read() == mdb.stream(sha).read() + assert ldb.stream(sha).read() == mdb.stream(sha).read() # END verify objects where copied and are equal diff --git a/gitdb/test/db/test_pack.py b/gitdb/test/db/test_pack.py index f4cb5bb..963a71a 100644 --- a/gitdb/test/db/test_pack.py +++ b/gitdb/test/db/test_pack.py @@ -2,9 +2,12 @@ # # This module is part of GitDB and is released under # the New BSD License: http://www.opensource.org/licenses/bsd-license.php -from lib import * +from gitdb.test.db.lib import ( + TestDBBase, + with_rw_directory, + with_packs_rw +) from gitdb.db import PackedDB -from gitdb.test.lib import fixture_path from gitdb.exc import BadObject, AmbiguousObjectName @@ -12,45 +15,45 @@ import os import random class TestPackDB(TestDBBase): - + @with_rw_directory @with_packs_rw def test_writing(self, path): pdb = PackedDB(path) - + # on demand, we init our pack cache num_packs = len(pdb.entities()) assert pdb._st_mtime != 0 - - # test pack directory changed: + + # test pack directory changed: # packs removed - rename a file, should affect the glob pack_path = pdb.entities()[0].pack().path() new_pack_path = pack_path + "renamed" os.rename(pack_path, new_pack_path) - + pdb.update_cache(force=True) assert len(pdb.entities()) == num_packs - 1 - + # packs added os.rename(new_pack_path, pack_path) pdb.update_cache(force=True) assert len(pdb.entities()) == num_packs - + # bang on the cache # access the Entities directly, as there is no iteration interface # yet ( or required for now ) sha_list = list(pdb.sha_iter()) assert len(sha_list) == pdb.size() - + # hit all packs in random order random.shuffle(sha_list) - + for sha in sha_list: - info = pdb.info(sha) - stream = pdb.stream(sha) + pdb.info(sha) + pdb.stream(sha) # END for each sha to query - - + + # test short finding - be a bit more brutal here max_bytes = 19 min_bytes = 2 @@ -64,10 +67,10 @@ class TestPackDB(TestDBBase): pass # valid, we can have short objects # END exception handling # END for each sha to find - + # we should have at least one ambiguous, considering the small sizes - # but in our pack, there is no ambigious ... + # but in our pack, there is no ambigious ... # assert num_ambiguous - + # non-existing - self.failUnlessRaises(BadObject, pdb.partial_to_complete_sha, "\0\0", 4) + self.failUnlessRaises(BadObject, pdb.partial_to_complete_sha, b'\0\0', 4) diff --git a/gitdb/test/db/test_ref.py b/gitdb/test/db/test_ref.py index 1637bff..db93082 100644 --- a/gitdb/test/db/test_ref.py +++ b/gitdb/test/db/test_ref.py @@ -2,59 +2,58 @@ # # This module is part of GitDB and is released under # the New BSD License: http://www.opensource.org/licenses/bsd-license.php -from lib import * +from gitdb.test.db.lib import ( + TestDBBase, + with_rw_directory, + fixture_path +) from gitdb.db import ReferenceDB from gitdb.util import ( - NULL_BIN_SHA, - hex_to_bin - ) + NULL_BIN_SHA, + hex_to_bin +) import os - + class TestReferenceDB(TestDBBase): - + def make_alt_file(self, alt_path, alt_list): """Create an alternates file which contains the given alternates. The list can be empty""" alt_file = open(alt_path, "wb") for alt in alt_list: - alt_file.write(alt + "\n") + alt_file.write(alt.encode("utf-8") + "\n".encode("ascii")) alt_file.close() - + @with_rw_directory def test_writing(self, path): - NULL_BIN_SHA = '\0' * 20 - alt_path = os.path.join(path, 'alternates') rdb = ReferenceDB(alt_path) assert len(rdb.databases()) == 0 assert rdb.size() == 0 assert len(list(rdb.sha_iter())) == 0 - + # try empty, non-existing assert not rdb.has_object(NULL_BIN_SHA) - - + # setup alternate file # add two, one is invalid own_repo_path = fixture_path('../../../.git/objects') # use own repo self.make_alt_file(alt_path, [own_repo_path, "invalid/path"]) rdb.update_cache() assert len(rdb.databases()) == 1 - + # we should now find a default revision of ours gitdb_sha = hex_to_bin("5690fd0d3304f378754b23b098bd7cb5f4aa1976") assert rdb.has_object(gitdb_sha) - + # remove valid self.make_alt_file(alt_path, ["just/one/invalid/path"]) rdb.update_cache() assert len(rdb.databases()) == 0 - + # add valid self.make_alt_file(alt_path, [own_repo_path]) rdb.update_cache() assert len(rdb.databases()) == 1 - - |
