summaryrefslogtreecommitdiff
path: root/gitdb/test/db
diff options
context:
space:
mode:
Diffstat (limited to 'gitdb/test/db')
-rw-r--r--gitdb/test/db/lib.py149
-rw-r--r--gitdb/test/db/test_git.py33
-rw-r--r--gitdb/test/db/test_loose.py21
-rw-r--r--gitdb/test/db/test_mem.py23
-rw-r--r--gitdb/test/db/test_pack.py41
-rw-r--r--gitdb/test/db/test_ref.py35
6 files changed, 117 insertions, 185 deletions
diff --git a/gitdb/test/db/lib.py b/gitdb/test/db/lib.py
index 62614ee..af6d9e0 100644
--- a/gitdb/test/db/lib.py
+++ b/gitdb/test/db/lib.py
@@ -6,63 +6,66 @@
from gitdb.test.lib import (
with_rw_directory,
with_packs_rw,
- ZippedStoreShaWriter,
fixture_path,
TestBase
- )
+)
-from gitdb.stream import Sha1Writer
+from gitdb.stream import (
+ Sha1Writer,
+ ZippedStoreShaWriter
+)
from gitdb.base import (
- IStream,
- OStream,
- OInfo
- )
-
+ IStream,
+ OStream,
+ OInfo
+)
+
from gitdb.exc import BadObject
from gitdb.typ import str_blob_type
+from gitdb.utils.compat import xrange
+
+from io import BytesIO
-from async import IteratorReader
-from cStringIO import StringIO
from struct import pack
__all__ = ('TestDBBase', 'with_rw_directory', 'with_packs_rw', 'fixture_path')
-
+
class TestDBBase(TestBase):
"""Base class providing testing routines on databases"""
-
+
# data
- two_lines = "1234\nhello world"
+ two_lines = b'1234\nhello world'
all_data = (two_lines, )
-
+
def _assert_object_writing_simple(self, db):
# write a bunch of objects and query their streams and info
null_objs = db.size()
ni = 250
for i in xrange(ni):
data = pack(">L", i)
- istream = IStream(str_blob_type, len(data), StringIO(data))
+ istream = IStream(str_blob_type, len(data), BytesIO(data))
new_istream = db.store(istream)
assert new_istream is istream
assert db.has_object(istream.binsha)
-
+
info = db.info(istream.binsha)
assert isinstance(info, OInfo)
assert info.type == istream.type and info.size == istream.size
-
+
stream = db.stream(istream.binsha)
assert isinstance(stream, OStream)
assert stream.binsha == info.binsha and stream.type == info.type
assert stream.read() == data
# END for each item
-
+
assert db.size() == null_objs + ni
shas = list(db.sha_iter())
assert len(shas) == db.size()
assert len(shas[0]) == 20
-
-
+
+
def _assert_object_writing(self, db):
"""General tests to verify object writing, compatible to ObjectDBW
**Note:** requires write access to the database"""
@@ -76,25 +79,24 @@ class TestDBBase(TestBase):
ostream = ostreamcls()
assert isinstance(ostream, Sha1Writer)
# END create ostream
-
+
prev_ostream = db.set_ostream(ostream)
- assert type(prev_ostream) in ostreams or prev_ostream in ostreams
-
- istream = IStream(str_blob_type, len(data), StringIO(data))
-
+ assert type(prev_ostream) in ostreams or prev_ostream in ostreams
+ istream = IStream(str_blob_type, len(data), BytesIO(data))
+
# store returns same istream instance, with new sha set
my_istream = db.store(istream)
sha = istream.binsha
assert my_istream is istream
assert db.has_object(sha) != dry_run
- assert len(sha) == 20
-
+ assert len(sha) == 20
+
# verify data - the slow way, we want to run code
if not dry_run:
info = db.info(sha)
assert str_blob_type == info.type
assert info.size == len(data)
-
+
ostream = db.stream(sha)
assert ostream.read() == data
assert ostream.type == str_blob_type
@@ -102,107 +104,26 @@ class TestDBBase(TestBase):
else:
self.failUnlessRaises(BadObject, db.info, sha)
self.failUnlessRaises(BadObject, db.stream, sha)
-
+
# DIRECT STREAM COPY
# our data hase been written in object format to the StringIO
# we pasesd as output stream. No physical database representation
# was created.
- # Test direct stream copy of object streams, the result must be
+ # Test direct stream copy of object streams, the result must be
# identical to what we fed in
ostream.seek(0)
istream.stream = ostream
assert istream.binsha is not None
prev_sha = istream.binsha
-
+
db.set_ostream(ZippedStoreShaWriter())
db.store(istream)
assert istream.binsha == prev_sha
new_ostream = db.ostream()
-
+
# note: only works as long our store write uses the same compression
# level, which is zip_best
assert ostream.getvalue() == new_ostream.getvalue()
# END for each data set
# END for each dry_run mode
-
- def _assert_object_writing_async(self, db):
- """Test generic object writing using asynchronous access"""
- ni = 5000
- def istream_generator(offset=0, ni=ni):
- for data_src in xrange(ni):
- data = str(data_src + offset)
- yield IStream(str_blob_type, len(data), StringIO(data))
- # END for each item
- # END generator utility
-
- # for now, we are very trusty here as we expect it to work if it worked
- # in the single-stream case
-
- # write objects
- reader = IteratorReader(istream_generator())
- istream_reader = db.store_async(reader)
- istreams = istream_reader.read() # read all
- assert istream_reader.task().error() is None
- assert len(istreams) == ni
-
- for stream in istreams:
- assert stream.error is None
- assert len(stream.binsha) == 20
- assert isinstance(stream, IStream)
- # END assert each stream
-
- # test has-object-async - we must have all previously added ones
- reader = IteratorReader( istream.binsha for istream in istreams )
- hasobject_reader = db.has_object_async(reader)
- count = 0
- for sha, has_object in hasobject_reader:
- assert has_object
- count += 1
- # END for each sha
- assert count == ni
-
- # read the objects we have just written
- reader = IteratorReader( istream.binsha for istream in istreams )
- ostream_reader = db.stream_async(reader)
-
- # read items individually to prevent hitting possible sys-limits
- count = 0
- for ostream in ostream_reader:
- assert isinstance(ostream, OStream)
- count += 1
- # END for each ostream
- assert ostream_reader.task().error() is None
- assert count == ni
-
- # get info about our items
- reader = IteratorReader( istream.binsha for istream in istreams )
- info_reader = db.info_async(reader)
-
- count = 0
- for oinfo in info_reader:
- assert isinstance(oinfo, OInfo)
- count += 1
- # END for each oinfo instance
- assert count == ni
-
-
- # combined read-write using a converter
- # add 2500 items, and obtain their output streams
- nni = 2500
- reader = IteratorReader(istream_generator(offset=ni, ni=nni))
- istream_to_sha = lambda istreams: [ istream.binsha for istream in istreams ]
-
- istream_reader = db.store_async(reader)
- istream_reader.set_post_cb(istream_to_sha)
-
- ostream_reader = db.stream_async(istream_reader)
-
- count = 0
- # read it individually, otherwise we might run into the ulimit
- for ostream in ostream_reader:
- assert isinstance(ostream, OStream)
- count += 1
- # END for each ostream
- assert count == nni
-
-
+
diff --git a/gitdb/test/db/test_git.py b/gitdb/test/db/test_git.py
index 1ef577a..e141c2b 100644
--- a/gitdb/test/db/test_git.py
+++ b/gitdb/test/db/test_git.py
@@ -2,46 +2,51 @@
#
# This module is part of GitDB and is released under
# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
-from lib import *
+from gitdb.test.db.lib import (
+ TestDBBase,
+ fixture_path,
+ with_rw_directory
+)
from gitdb.exc import BadObject
from gitdb.db import GitDB
from gitdb.base import OStream, OInfo
from gitdb.util import hex_to_bin, bin_to_hex
-
+
class TestGitDB(TestDBBase):
-
+
def test_reading(self):
gdb = GitDB(fixture_path('../../../.git/objects'))
-
+
# we have packs and loose objects, alternates doesn't necessarily exist
assert 1 < len(gdb.databases()) < 4
-
+
# access should be possible
gitdb_sha = hex_to_bin("5690fd0d3304f378754b23b098bd7cb5f4aa1976")
assert isinstance(gdb.info(gitdb_sha), OInfo)
assert isinstance(gdb.stream(gitdb_sha), OStream)
- assert gdb.size() > 200
+ ni = 50
+ assert gdb.size() >= ni
sha_list = list(gdb.sha_iter())
assert len(sha_list) == gdb.size()
-
-
- # This is actually a test for compound functionality, but it doesn't
+ sha_list = sha_list[:ni] # speed up tests ...
+
+
+ # This is actually a test for compound functionality, but it doesn't
# have a separate test module
# test partial shas
# this one as uneven and quite short
assert gdb.partial_to_complete_sha_hex('155b6') == hex_to_bin("155b62a9af0aa7677078331e111d0f7aa6eb4afc")
-
+
# mix even/uneven hexshas
for i, binsha in enumerate(sha_list):
assert gdb.partial_to_complete_sha_hex(bin_to_hex(binsha)[:8-(i%2)]) == binsha
# END for each sha
-
+
self.failUnlessRaises(BadObject, gdb.partial_to_complete_sha_hex, "0000")
-
+
@with_rw_directory
def test_writing(self, path):
gdb = GitDB(path)
-
+
# its possible to write objects
self._assert_object_writing(gdb)
- self._assert_object_writing_async(gdb)
diff --git a/gitdb/test/db/test_loose.py b/gitdb/test/db/test_loose.py
index d7e1d01..1d6af9c 100644
--- a/gitdb/test/db/test_loose.py
+++ b/gitdb/test/db/test_loose.py
@@ -2,33 +2,34 @@
#
# This module is part of GitDB and is released under
# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
-from lib import *
+from gitdb.test.db.lib import (
+ TestDBBase,
+ with_rw_directory
+)
from gitdb.db import LooseObjectDB
from gitdb.exc import BadObject
from gitdb.util import bin_to_hex
-
+
class TestLooseDB(TestDBBase):
-
+
@with_rw_directory
def test_basics(self, path):
ldb = LooseObjectDB(path)
-
+
# write data
self._assert_object_writing(ldb)
- self._assert_object_writing_async(ldb)
-
+
# verify sha iteration and size
shas = list(ldb.sha_iter())
assert shas and len(shas[0]) == 20
-
+
assert len(shas) == ldb.size()
-
+
# verify find short object
long_sha = bin_to_hex(shas[-1])
for short_sha in (long_sha[:20], long_sha[:5]):
assert bin_to_hex(ldb.partial_to_complete_sha_hex(short_sha)) == long_sha
# END for each sha
-
+
self.failUnlessRaises(BadObject, ldb.partial_to_complete_sha_hex, '0000')
# raises if no object could be foudn
-
diff --git a/gitdb/test/db/test_mem.py b/gitdb/test/db/test_mem.py
index df428e2..97f7217 100644
--- a/gitdb/test/db/test_mem.py
+++ b/gitdb/test/db/test_mem.py
@@ -2,29 +2,32 @@
#
# This module is part of GitDB and is released under
# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
-from lib import *
+from gitdb.test.db.lib import (
+ TestDBBase,
+ with_rw_directory
+)
from gitdb.db import (
- MemoryDB,
- LooseObjectDB
- )
-
+ MemoryDB,
+ LooseObjectDB
+)
+
class TestMemoryDB(TestDBBase):
-
+
@with_rw_directory
def test_writing(self, path):
mdb = MemoryDB()
-
+
# write data
self._assert_object_writing_simple(mdb)
-
+
# test stream copy
ldb = LooseObjectDB(path)
assert ldb.size() == 0
num_streams_copied = mdb.stream_copy(mdb.sha_iter(), ldb)
assert num_streams_copied == mdb.size()
-
+
assert ldb.size() == mdb.size()
for sha in mdb.sha_iter():
assert ldb.has_object(sha)
- assert ldb.stream(sha).read() == mdb.stream(sha).read()
+ assert ldb.stream(sha).read() == mdb.stream(sha).read()
# END verify objects where copied and are equal
diff --git a/gitdb/test/db/test_pack.py b/gitdb/test/db/test_pack.py
index f4cb5bb..963a71a 100644
--- a/gitdb/test/db/test_pack.py
+++ b/gitdb/test/db/test_pack.py
@@ -2,9 +2,12 @@
#
# This module is part of GitDB and is released under
# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
-from lib import *
+from gitdb.test.db.lib import (
+ TestDBBase,
+ with_rw_directory,
+ with_packs_rw
+)
from gitdb.db import PackedDB
-from gitdb.test.lib import fixture_path
from gitdb.exc import BadObject, AmbiguousObjectName
@@ -12,45 +15,45 @@ import os
import random
class TestPackDB(TestDBBase):
-
+
@with_rw_directory
@with_packs_rw
def test_writing(self, path):
pdb = PackedDB(path)
-
+
# on demand, we init our pack cache
num_packs = len(pdb.entities())
assert pdb._st_mtime != 0
-
- # test pack directory changed:
+
+ # test pack directory changed:
# packs removed - rename a file, should affect the glob
pack_path = pdb.entities()[0].pack().path()
new_pack_path = pack_path + "renamed"
os.rename(pack_path, new_pack_path)
-
+
pdb.update_cache(force=True)
assert len(pdb.entities()) == num_packs - 1
-
+
# packs added
os.rename(new_pack_path, pack_path)
pdb.update_cache(force=True)
assert len(pdb.entities()) == num_packs
-
+
# bang on the cache
# access the Entities directly, as there is no iteration interface
# yet ( or required for now )
sha_list = list(pdb.sha_iter())
assert len(sha_list) == pdb.size()
-
+
# hit all packs in random order
random.shuffle(sha_list)
-
+
for sha in sha_list:
- info = pdb.info(sha)
- stream = pdb.stream(sha)
+ pdb.info(sha)
+ pdb.stream(sha)
# END for each sha to query
-
-
+
+
# test short finding - be a bit more brutal here
max_bytes = 19
min_bytes = 2
@@ -64,10 +67,10 @@ class TestPackDB(TestDBBase):
pass # valid, we can have short objects
# END exception handling
# END for each sha to find
-
+
# we should have at least one ambiguous, considering the small sizes
- # but in our pack, there is no ambigious ...
+ # but in our pack, there is no ambigious ...
# assert num_ambiguous
-
+
# non-existing
- self.failUnlessRaises(BadObject, pdb.partial_to_complete_sha, "\0\0", 4)
+ self.failUnlessRaises(BadObject, pdb.partial_to_complete_sha, b'\0\0', 4)
diff --git a/gitdb/test/db/test_ref.py b/gitdb/test/db/test_ref.py
index 1637bff..db93082 100644
--- a/gitdb/test/db/test_ref.py
+++ b/gitdb/test/db/test_ref.py
@@ -2,59 +2,58 @@
#
# This module is part of GitDB and is released under
# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
-from lib import *
+from gitdb.test.db.lib import (
+ TestDBBase,
+ with_rw_directory,
+ fixture_path
+)
from gitdb.db import ReferenceDB
from gitdb.util import (
- NULL_BIN_SHA,
- hex_to_bin
- )
+ NULL_BIN_SHA,
+ hex_to_bin
+)
import os
-
+
class TestReferenceDB(TestDBBase):
-
+
def make_alt_file(self, alt_path, alt_list):
"""Create an alternates file which contains the given alternates.
The list can be empty"""
alt_file = open(alt_path, "wb")
for alt in alt_list:
- alt_file.write(alt + "\n")
+ alt_file.write(alt.encode("utf-8") + "\n".encode("ascii"))
alt_file.close()
-
+
@with_rw_directory
def test_writing(self, path):
- NULL_BIN_SHA = '\0' * 20
-
alt_path = os.path.join(path, 'alternates')
rdb = ReferenceDB(alt_path)
assert len(rdb.databases()) == 0
assert rdb.size() == 0
assert len(list(rdb.sha_iter())) == 0
-
+
# try empty, non-existing
assert not rdb.has_object(NULL_BIN_SHA)
-
-
+
# setup alternate file
# add two, one is invalid
own_repo_path = fixture_path('../../../.git/objects') # use own repo
self.make_alt_file(alt_path, [own_repo_path, "invalid/path"])
rdb.update_cache()
assert len(rdb.databases()) == 1
-
+
# we should now find a default revision of ours
gitdb_sha = hex_to_bin("5690fd0d3304f378754b23b098bd7cb5f4aa1976")
assert rdb.has_object(gitdb_sha)
-
+
# remove valid
self.make_alt_file(alt_path, ["just/one/invalid/path"])
rdb.update_cache()
assert len(rdb.databases()) == 0
-
+
# add valid
self.make_alt_file(alt_path, [own_repo_path])
rdb.update_cache()
assert len(rdb.databases()) == 1
-
-