summaryrefslogtreecommitdiff
path: root/gitdb/test
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2014-02-09 20:51:43 +0100
committerSebastian Thiel <byronimo@gmail.com>2014-02-09 20:51:43 +0100
commit6576d5503a64d124fd7bcf639cc8955918b3ac43 (patch)
tree847028954b05307086eda1782c2e9521c8d67a13 /gitdb/test
parentea54328ce05abdcb4f23300df51422e62b737f63 (diff)
downloadgitdb-6576d5503a64d124fd7bcf639cc8955918b3ac43.tar.gz
tabs to spaces
Diffstat (limited to 'gitdb/test')
-rw-r--r--gitdb/test/__init__.py8
-rw-r--r--gitdb/test/db/lib.py376
-rw-r--r--gitdb/test/db/test_git.py74
-rw-r--r--gitdb/test/db/test_loose.py50
-rw-r--r--gitdb/test/db/test_mem.py46
-rw-r--r--gitdb/test/db/test_pack.py118
-rw-r--r--gitdb/test/db/test_ref.py98
-rw-r--r--gitdb/test/lib.py206
-rw-r--r--gitdb/test/performance/lib.py56
-rw-r--r--gitdb/test/performance/test_pack.py150
-rw-r--r--gitdb/test/performance/test_pack_streaming.py120
-rw-r--r--gitdb/test/performance/test_stream.py324
-rw-r--r--gitdb/test/test_base.py168
-rw-r--r--gitdb/test/test_example.py102
-rw-r--r--gitdb/test/test_pack.py438
-rw-r--r--gitdb/test/test_stream.py266
-rw-r--r--gitdb/test/test_util.py184
17 files changed, 1392 insertions, 1392 deletions
diff --git a/gitdb/test/__init__.py b/gitdb/test/__init__.py
index 760f531..f805944 100644
--- a/gitdb/test/__init__.py
+++ b/gitdb/test/__init__.py
@@ -7,10 +7,10 @@ import gitdb.util
#{ Initialization
def _init_pool():
- """Assure the pool is actually threaded"""
- size = 2
- print "Setting ThreadPool to %i" % size
- gitdb.util.pool.set_size(size)
+ """Assure the pool is actually threaded"""
+ size = 2
+ print "Setting ThreadPool to %i" % size
+ gitdb.util.pool.set_size(size)
#} END initialization
diff --git a/gitdb/test/db/lib.py b/gitdb/test/db/lib.py
index 4af4483..62614ee 100644
--- a/gitdb/test/db/lib.py
+++ b/gitdb/test/db/lib.py
@@ -4,21 +4,21 @@
# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
"""Base classes for object db testing"""
from gitdb.test.lib import (
- with_rw_directory,
- with_packs_rw,
- ZippedStoreShaWriter,
- fixture_path,
- TestBase
- )
+ with_rw_directory,
+ with_packs_rw,
+ ZippedStoreShaWriter,
+ fixture_path,
+ TestBase
+ )
from gitdb.stream import Sha1Writer
from gitdb.base import (
- IStream,
- OStream,
- OInfo
- )
-
+ IStream,
+ OStream,
+ OInfo
+ )
+
from gitdb.exc import BadObject
from gitdb.typ import str_blob_type
@@ -28,181 +28,181 @@ from struct import pack
__all__ = ('TestDBBase', 'with_rw_directory', 'with_packs_rw', 'fixture_path')
-
+
class TestDBBase(TestBase):
- """Base class providing testing routines on databases"""
-
- # data
- two_lines = "1234\nhello world"
- all_data = (two_lines, )
-
- def _assert_object_writing_simple(self, db):
- # write a bunch of objects and query their streams and info
- null_objs = db.size()
- ni = 250
- for i in xrange(ni):
- data = pack(">L", i)
- istream = IStream(str_blob_type, len(data), StringIO(data))
- new_istream = db.store(istream)
- assert new_istream is istream
- assert db.has_object(istream.binsha)
-
- info = db.info(istream.binsha)
- assert isinstance(info, OInfo)
- assert info.type == istream.type and info.size == istream.size
-
- stream = db.stream(istream.binsha)
- assert isinstance(stream, OStream)
- assert stream.binsha == info.binsha and stream.type == info.type
- assert stream.read() == data
- # END for each item
-
- assert db.size() == null_objs + ni
- shas = list(db.sha_iter())
- assert len(shas) == db.size()
- assert len(shas[0]) == 20
-
-
- def _assert_object_writing(self, db):
- """General tests to verify object writing, compatible to ObjectDBW
- **Note:** requires write access to the database"""
- # start in 'dry-run' mode, using a simple sha1 writer
- ostreams = (ZippedStoreShaWriter, None)
- for ostreamcls in ostreams:
- for data in self.all_data:
- dry_run = ostreamcls is not None
- ostream = None
- if ostreamcls is not None:
- ostream = ostreamcls()
- assert isinstance(ostream, Sha1Writer)
- # END create ostream
-
- prev_ostream = db.set_ostream(ostream)
- assert type(prev_ostream) in ostreams or prev_ostream in ostreams
-
- istream = IStream(str_blob_type, len(data), StringIO(data))
-
- # store returns same istream instance, with new sha set
- my_istream = db.store(istream)
- sha = istream.binsha
- assert my_istream is istream
- assert db.has_object(sha) != dry_run
- assert len(sha) == 20
-
- # verify data - the slow way, we want to run code
- if not dry_run:
- info = db.info(sha)
- assert str_blob_type == info.type
- assert info.size == len(data)
-
- ostream = db.stream(sha)
- assert ostream.read() == data
- assert ostream.type == str_blob_type
- assert ostream.size == len(data)
- else:
- self.failUnlessRaises(BadObject, db.info, sha)
- self.failUnlessRaises(BadObject, db.stream, sha)
-
- # DIRECT STREAM COPY
- # our data hase been written in object format to the StringIO
- # we pasesd as output stream. No physical database representation
- # was created.
- # Test direct stream copy of object streams, the result must be
- # identical to what we fed in
- ostream.seek(0)
- istream.stream = ostream
- assert istream.binsha is not None
- prev_sha = istream.binsha
-
- db.set_ostream(ZippedStoreShaWriter())
- db.store(istream)
- assert istream.binsha == prev_sha
- new_ostream = db.ostream()
-
- # note: only works as long our store write uses the same compression
- # level, which is zip_best
- assert ostream.getvalue() == new_ostream.getvalue()
- # END for each data set
- # END for each dry_run mode
-
- def _assert_object_writing_async(self, db):
- """Test generic object writing using asynchronous access"""
- ni = 5000
- def istream_generator(offset=0, ni=ni):
- for data_src in xrange(ni):
- data = str(data_src + offset)
- yield IStream(str_blob_type, len(data), StringIO(data))
- # END for each item
- # END generator utility
-
- # for now, we are very trusty here as we expect it to work if it worked
- # in the single-stream case
-
- # write objects
- reader = IteratorReader(istream_generator())
- istream_reader = db.store_async(reader)
- istreams = istream_reader.read() # read all
- assert istream_reader.task().error() is None
- assert len(istreams) == ni
-
- for stream in istreams:
- assert stream.error is None
- assert len(stream.binsha) == 20
- assert isinstance(stream, IStream)
- # END assert each stream
-
- # test has-object-async - we must have all previously added ones
- reader = IteratorReader( istream.binsha for istream in istreams )
- hasobject_reader = db.has_object_async(reader)
- count = 0
- for sha, has_object in hasobject_reader:
- assert has_object
- count += 1
- # END for each sha
- assert count == ni
-
- # read the objects we have just written
- reader = IteratorReader( istream.binsha for istream in istreams )
- ostream_reader = db.stream_async(reader)
-
- # read items individually to prevent hitting possible sys-limits
- count = 0
- for ostream in ostream_reader:
- assert isinstance(ostream, OStream)
- count += 1
- # END for each ostream
- assert ostream_reader.task().error() is None
- assert count == ni
-
- # get info about our items
- reader = IteratorReader( istream.binsha for istream in istreams )
- info_reader = db.info_async(reader)
-
- count = 0
- for oinfo in info_reader:
- assert isinstance(oinfo, OInfo)
- count += 1
- # END for each oinfo instance
- assert count == ni
-
-
- # combined read-write using a converter
- # add 2500 items, and obtain their output streams
- nni = 2500
- reader = IteratorReader(istream_generator(offset=ni, ni=nni))
- istream_to_sha = lambda istreams: [ istream.binsha for istream in istreams ]
-
- istream_reader = db.store_async(reader)
- istream_reader.set_post_cb(istream_to_sha)
-
- ostream_reader = db.stream_async(istream_reader)
-
- count = 0
- # read it individually, otherwise we might run into the ulimit
- for ostream in ostream_reader:
- assert isinstance(ostream, OStream)
- count += 1
- # END for each ostream
- assert count == nni
-
-
+ """Base class providing testing routines on databases"""
+
+ # data
+ two_lines = "1234\nhello world"
+ all_data = (two_lines, )
+
+ def _assert_object_writing_simple(self, db):
+ # write a bunch of objects and query their streams and info
+ null_objs = db.size()
+ ni = 250
+ for i in xrange(ni):
+ data = pack(">L", i)
+ istream = IStream(str_blob_type, len(data), StringIO(data))
+ new_istream = db.store(istream)
+ assert new_istream is istream
+ assert db.has_object(istream.binsha)
+
+ info = db.info(istream.binsha)
+ assert isinstance(info, OInfo)
+ assert info.type == istream.type and info.size == istream.size
+
+ stream = db.stream(istream.binsha)
+ assert isinstance(stream, OStream)
+ assert stream.binsha == info.binsha and stream.type == info.type
+ assert stream.read() == data
+ # END for each item
+
+ assert db.size() == null_objs + ni
+ shas = list(db.sha_iter())
+ assert len(shas) == db.size()
+ assert len(shas[0]) == 20
+
+
+ def _assert_object_writing(self, db):
+ """General tests to verify object writing, compatible to ObjectDBW
+ **Note:** requires write access to the database"""
+ # start in 'dry-run' mode, using a simple sha1 writer
+ ostreams = (ZippedStoreShaWriter, None)
+ for ostreamcls in ostreams:
+ for data in self.all_data:
+ dry_run = ostreamcls is not None
+ ostream = None
+ if ostreamcls is not None:
+ ostream = ostreamcls()
+ assert isinstance(ostream, Sha1Writer)
+ # END create ostream
+
+ prev_ostream = db.set_ostream(ostream)
+ assert type(prev_ostream) in ostreams or prev_ostream in ostreams
+
+ istream = IStream(str_blob_type, len(data), StringIO(data))
+
+ # store returns same istream instance, with new sha set
+ my_istream = db.store(istream)
+ sha = istream.binsha
+ assert my_istream is istream
+ assert db.has_object(sha) != dry_run
+ assert len(sha) == 20
+
+ # verify data - the slow way, we want to run code
+ if not dry_run:
+ info = db.info(sha)
+ assert str_blob_type == info.type
+ assert info.size == len(data)
+
+ ostream = db.stream(sha)
+ assert ostream.read() == data
+ assert ostream.type == str_blob_type
+ assert ostream.size == len(data)
+ else:
+ self.failUnlessRaises(BadObject, db.info, sha)
+ self.failUnlessRaises(BadObject, db.stream, sha)
+
+ # DIRECT STREAM COPY
+ # our data hase been written in object format to the StringIO
+ # we pasesd as output stream. No physical database representation
+ # was created.
+ # Test direct stream copy of object streams, the result must be
+ # identical to what we fed in
+ ostream.seek(0)
+ istream.stream = ostream
+ assert istream.binsha is not None
+ prev_sha = istream.binsha
+
+ db.set_ostream(ZippedStoreShaWriter())
+ db.store(istream)
+ assert istream.binsha == prev_sha
+ new_ostream = db.ostream()
+
+ # note: only works as long our store write uses the same compression
+ # level, which is zip_best
+ assert ostream.getvalue() == new_ostream.getvalue()
+ # END for each data set
+ # END for each dry_run mode
+
+ def _assert_object_writing_async(self, db):
+ """Test generic object writing using asynchronous access"""
+ ni = 5000
+ def istream_generator(offset=0, ni=ni):
+ for data_src in xrange(ni):
+ data = str(data_src + offset)
+ yield IStream(str_blob_type, len(data), StringIO(data))
+ # END for each item
+ # END generator utility
+
+ # for now, we are very trusty here as we expect it to work if it worked
+ # in the single-stream case
+
+ # write objects
+ reader = IteratorReader(istream_generator())
+ istream_reader = db.store_async(reader)
+ istreams = istream_reader.read() # read all
+ assert istream_reader.task().error() is None
+ assert len(istreams) == ni
+
+ for stream in istreams:
+ assert stream.error is None
+ assert len(stream.binsha) == 20
+ assert isinstance(stream, IStream)
+ # END assert each stream
+
+ # test has-object-async - we must have all previously added ones
+ reader = IteratorReader( istream.binsha for istream in istreams )
+ hasobject_reader = db.has_object_async(reader)
+ count = 0
+ for sha, has_object in hasobject_reader:
+ assert has_object
+ count += 1
+ # END for each sha
+ assert count == ni
+
+ # read the objects we have just written
+ reader = IteratorReader( istream.binsha for istream in istreams )
+ ostream_reader = db.stream_async(reader)
+
+ # read items individually to prevent hitting possible sys-limits
+ count = 0
+ for ostream in ostream_reader:
+ assert isinstance(ostream, OStream)
+ count += 1
+ # END for each ostream
+ assert ostream_reader.task().error() is None
+ assert count == ni
+
+ # get info about our items
+ reader = IteratorReader( istream.binsha for istream in istreams )
+ info_reader = db.info_async(reader)
+
+ count = 0
+ for oinfo in info_reader:
+ assert isinstance(oinfo, OInfo)
+ count += 1
+ # END for each oinfo instance
+ assert count == ni
+
+
+ # combined read-write using a converter
+ # add 2500 items, and obtain their output streams
+ nni = 2500
+ reader = IteratorReader(istream_generator(offset=ni, ni=nni))
+ istream_to_sha = lambda istreams: [ istream.binsha for istream in istreams ]
+
+ istream_reader = db.store_async(reader)
+ istream_reader.set_post_cb(istream_to_sha)
+
+ ostream_reader = db.stream_async(istream_reader)
+
+ count = 0
+ # read it individually, otherwise we might run into the ulimit
+ for ostream in ostream_reader:
+ assert isinstance(ostream, OStream)
+ count += 1
+ # END for each ostream
+ assert count == nni
+
+
diff --git a/gitdb/test/db/test_git.py b/gitdb/test/db/test_git.py
index 3101163..1ef577a 100644
--- a/gitdb/test/db/test_git.py
+++ b/gitdb/test/db/test_git.py
@@ -7,41 +7,41 @@ from gitdb.exc import BadObject
from gitdb.db import GitDB
from gitdb.base import OStream, OInfo
from gitdb.util import hex_to_bin, bin_to_hex
-
+
class TestGitDB(TestDBBase):
-
- def test_reading(self):
- gdb = GitDB(fixture_path('../../../.git/objects'))
-
- # we have packs and loose objects, alternates doesn't necessarily exist
- assert 1 < len(gdb.databases()) < 4
-
- # access should be possible
- gitdb_sha = hex_to_bin("5690fd0d3304f378754b23b098bd7cb5f4aa1976")
- assert isinstance(gdb.info(gitdb_sha), OInfo)
- assert isinstance(gdb.stream(gitdb_sha), OStream)
- assert gdb.size() > 200
- sha_list = list(gdb.sha_iter())
- assert len(sha_list) == gdb.size()
-
-
- # This is actually a test for compound functionality, but it doesn't
- # have a separate test module
- # test partial shas
- # this one as uneven and quite short
- assert gdb.partial_to_complete_sha_hex('155b6') == hex_to_bin("155b62a9af0aa7677078331e111d0f7aa6eb4afc")
-
- # mix even/uneven hexshas
- for i, binsha in enumerate(sha_list):
- assert gdb.partial_to_complete_sha_hex(bin_to_hex(binsha)[:8-(i%2)]) == binsha
- # END for each sha
-
- self.failUnlessRaises(BadObject, gdb.partial_to_complete_sha_hex, "0000")
-
- @with_rw_directory
- def test_writing(self, path):
- gdb = GitDB(path)
-
- # its possible to write objects
- self._assert_object_writing(gdb)
- self._assert_object_writing_async(gdb)
+
+ def test_reading(self):
+ gdb = GitDB(fixture_path('../../../.git/objects'))
+
+ # we have packs and loose objects, alternates doesn't necessarily exist
+ assert 1 < len(gdb.databases()) < 4
+
+ # access should be possible
+ gitdb_sha = hex_to_bin("5690fd0d3304f378754b23b098bd7cb5f4aa1976")
+ assert isinstance(gdb.info(gitdb_sha), OInfo)
+ assert isinstance(gdb.stream(gitdb_sha), OStream)
+ assert gdb.size() > 200
+ sha_list = list(gdb.sha_iter())
+ assert len(sha_list) == gdb.size()
+
+
+ # This is actually a test for compound functionality, but it doesn't
+ # have a separate test module
+ # test partial shas
+ # this one as uneven and quite short
+ assert gdb.partial_to_complete_sha_hex('155b6') == hex_to_bin("155b62a9af0aa7677078331e111d0f7aa6eb4afc")
+
+ # mix even/uneven hexshas
+ for i, binsha in enumerate(sha_list):
+ assert gdb.partial_to_complete_sha_hex(bin_to_hex(binsha)[:8-(i%2)]) == binsha
+ # END for each sha
+
+ self.failUnlessRaises(BadObject, gdb.partial_to_complete_sha_hex, "0000")
+
+ @with_rw_directory
+ def test_writing(self, path):
+ gdb = GitDB(path)
+
+ # its possible to write objects
+ self._assert_object_writing(gdb)
+ self._assert_object_writing_async(gdb)
diff --git a/gitdb/test/db/test_loose.py b/gitdb/test/db/test_loose.py
index ee2d78d..d7e1d01 100644
--- a/gitdb/test/db/test_loose.py
+++ b/gitdb/test/db/test_loose.py
@@ -6,29 +6,29 @@ from lib import *
from gitdb.db import LooseObjectDB
from gitdb.exc import BadObject
from gitdb.util import bin_to_hex
-
+
class TestLooseDB(TestDBBase):
-
- @with_rw_directory
- def test_basics(self, path):
- ldb = LooseObjectDB(path)
-
- # write data
- self._assert_object_writing(ldb)
- self._assert_object_writing_async(ldb)
-
- # verify sha iteration and size
- shas = list(ldb.sha_iter())
- assert shas and len(shas[0]) == 20
-
- assert len(shas) == ldb.size()
-
- # verify find short object
- long_sha = bin_to_hex(shas[-1])
- for short_sha in (long_sha[:20], long_sha[:5]):
- assert bin_to_hex(ldb.partial_to_complete_sha_hex(short_sha)) == long_sha
- # END for each sha
-
- self.failUnlessRaises(BadObject, ldb.partial_to_complete_sha_hex, '0000')
- # raises if no object could be foudn
-
+
+ @with_rw_directory
+ def test_basics(self, path):
+ ldb = LooseObjectDB(path)
+
+ # write data
+ self._assert_object_writing(ldb)
+ self._assert_object_writing_async(ldb)
+
+ # verify sha iteration and size
+ shas = list(ldb.sha_iter())
+ assert shas and len(shas[0]) == 20
+
+ assert len(shas) == ldb.size()
+
+ # verify find short object
+ long_sha = bin_to_hex(shas[-1])
+ for short_sha in (long_sha[:20], long_sha[:5]):
+ assert bin_to_hex(ldb.partial_to_complete_sha_hex(short_sha)) == long_sha
+ # END for each sha
+
+ self.failUnlessRaises(BadObject, ldb.partial_to_complete_sha_hex, '0000')
+ # raises if no object could be foudn
+
diff --git a/gitdb/test/db/test_mem.py b/gitdb/test/db/test_mem.py
index 188cb0a..df428e2 100644
--- a/gitdb/test/db/test_mem.py
+++ b/gitdb/test/db/test_mem.py
@@ -4,27 +4,27 @@
# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
from lib import *
from gitdb.db import (
- MemoryDB,
- LooseObjectDB
- )
-
+ MemoryDB,
+ LooseObjectDB
+ )
+
class TestMemoryDB(TestDBBase):
-
- @with_rw_directory
- def test_writing(self, path):
- mdb = MemoryDB()
-
- # write data
- self._assert_object_writing_simple(mdb)
-
- # test stream copy
- ldb = LooseObjectDB(path)
- assert ldb.size() == 0
- num_streams_copied = mdb.stream_copy(mdb.sha_iter(), ldb)
- assert num_streams_copied == mdb.size()
-
- assert ldb.size() == mdb.size()
- for sha in mdb.sha_iter():
- assert ldb.has_object(sha)
- assert ldb.stream(sha).read() == mdb.stream(sha).read()
- # END verify objects where copied and are equal
+
+ @with_rw_directory
+ def test_writing(self, path):
+ mdb = MemoryDB()
+
+ # write data
+ self._assert_object_writing_simple(mdb)
+
+ # test stream copy
+ ldb = LooseObjectDB(path)
+ assert ldb.size() == 0
+ num_streams_copied = mdb.stream_copy(mdb.sha_iter(), ldb)
+ assert num_streams_copied == mdb.size()
+
+ assert ldb.size() == mdb.size()
+ for sha in mdb.sha_iter():
+ assert ldb.has_object(sha)
+ assert ldb.stream(sha).read() == mdb.stream(sha).read()
+ # END verify objects where copied and are equal
diff --git a/gitdb/test/db/test_pack.py b/gitdb/test/db/test_pack.py
index e8ba6f8..f4cb5bb 100644
--- a/gitdb/test/db/test_pack.py
+++ b/gitdb/test/db/test_pack.py
@@ -12,62 +12,62 @@ import os
import random
class TestPackDB(TestDBBase):
-
- @with_rw_directory
- @with_packs_rw
- def test_writing(self, path):
- pdb = PackedDB(path)
-
- # on demand, we init our pack cache
- num_packs = len(pdb.entities())
- assert pdb._st_mtime != 0
-
- # test pack directory changed:
- # packs removed - rename a file, should affect the glob
- pack_path = pdb.entities()[0].pack().path()
- new_pack_path = pack_path + "renamed"
- os.rename(pack_path, new_pack_path)
-
- pdb.update_cache(force=True)
- assert len(pdb.entities()) == num_packs - 1
-
- # packs added
- os.rename(new_pack_path, pack_path)
- pdb.update_cache(force=True)
- assert len(pdb.entities()) == num_packs
-
- # bang on the cache
- # access the Entities directly, as there is no iteration interface
- # yet ( or required for now )
- sha_list = list(pdb.sha_iter())
- assert len(sha_list) == pdb.size()
-
- # hit all packs in random order
- random.shuffle(sha_list)
-
- for sha in sha_list:
- info = pdb.info(sha)
- stream = pdb.stream(sha)
- # END for each sha to query
-
-
- # test short finding - be a bit more brutal here
- max_bytes = 19
- min_bytes = 2
- num_ambiguous = 0
- for i, sha in enumerate(sha_list):
- short_sha = sha[:max((i % max_bytes), min_bytes)]
- try:
- assert pdb.partial_to_complete_sha(short_sha, len(short_sha)*2) == sha
- except AmbiguousObjectName:
- num_ambiguous += 1
- pass # valid, we can have short objects
- # END exception handling
- # END for each sha to find
-
- # we should have at least one ambiguous, considering the small sizes
- # but in our pack, there is no ambigious ...
- # assert num_ambiguous
-
- # non-existing
- self.failUnlessRaises(BadObject, pdb.partial_to_complete_sha, "\0\0", 4)
+
+ @with_rw_directory
+ @with_packs_rw
+ def test_writing(self, path):
+ pdb = PackedDB(path)
+
+ # on demand, we init our pack cache
+ num_packs = len(pdb.entities())
+ assert pdb._st_mtime != 0
+
+ # test pack directory changed:
+ # packs removed - rename a file, should affect the glob
+ pack_path = pdb.entities()[0].pack().path()
+ new_pack_path = pack_path + "renamed"
+ os.rename(pack_path, new_pack_path)
+
+ pdb.update_cache(force=True)
+ assert len(pdb.entities()) == num_packs - 1
+
+ # packs added
+ os.rename(new_pack_path, pack_path)
+ pdb.update_cache(force=True)
+ assert len(pdb.entities()) == num_packs
+
+ # bang on the cache
+ # access the Entities directly, as there is no iteration interface
+ # yet ( or required for now )
+ sha_list = list(pdb.sha_iter())
+ assert len(sha_list) == pdb.size()
+
+ # hit all packs in random order
+ random.shuffle(sha_list)
+
+ for sha in sha_list:
+ info = pdb.info(sha)
+ stream = pdb.stream(sha)
+ # END for each sha to query
+
+
+ # test short finding - be a bit more brutal here
+ max_bytes = 19
+ min_bytes = 2
+ num_ambiguous = 0
+ for i, sha in enumerate(sha_list):
+ short_sha = sha[:max((i % max_bytes), min_bytes)]
+ try:
+ assert pdb.partial_to_complete_sha(short_sha, len(short_sha)*2) == sha
+ except AmbiguousObjectName:
+ num_ambiguous += 1
+ pass # valid, we can have short objects
+ # END exception handling
+ # END for each sha to find
+
+ # we should have at least one ambiguous, considering the small sizes
+ # but in our pack, there is no ambigious ...
+ # assert num_ambiguous
+
+ # non-existing
+ self.failUnlessRaises(BadObject, pdb.partial_to_complete_sha, "\0\0", 4)
diff --git a/gitdb/test/db/test_ref.py b/gitdb/test/db/test_ref.py
index 0d8eeeb..1637bff 100644
--- a/gitdb/test/db/test_ref.py
+++ b/gitdb/test/db/test_ref.py
@@ -6,55 +6,55 @@ from lib import *
from gitdb.db import ReferenceDB
from gitdb.util import (
- NULL_BIN_SHA,
- hex_to_bin
- )
+ NULL_BIN_SHA,
+ hex_to_bin
+ )
import os
-
+
class TestReferenceDB(TestDBBase):
-
- def make_alt_file(self, alt_path, alt_list):
- """Create an alternates file which contains the given alternates.
- The list can be empty"""
- alt_file = open(alt_path, "wb")
- for alt in alt_list:
- alt_file.write(alt + "\n")
- alt_file.close()
-
- @with_rw_directory
- def test_writing(self, path):
- NULL_BIN_SHA = '\0' * 20
-
- alt_path = os.path.join(path, 'alternates')
- rdb = ReferenceDB(alt_path)
- assert len(rdb.databases()) == 0
- assert rdb.size() == 0
- assert len(list(rdb.sha_iter())) == 0
-
- # try empty, non-existing
- assert not rdb.has_object(NULL_BIN_SHA)
-
-
- # setup alternate file
- # add two, one is invalid
- own_repo_path = fixture_path('../../../.git/objects') # use own repo
- self.make_alt_file(alt_path, [own_repo_path, "invalid/path"])
- rdb.update_cache()
- assert len(rdb.databases()) == 1
-
- # we should now find a default revision of ours
- gitdb_sha = hex_to_bin("5690fd0d3304f378754b23b098bd7cb5f4aa1976")
- assert rdb.has_object(gitdb_sha)
-
- # remove valid
- self.make_alt_file(alt_path, ["just/one/invalid/path"])
- rdb.update_cache()
- assert len(rdb.databases()) == 0
-
- # add valid
- self.make_alt_file(alt_path, [own_repo_path])
- rdb.update_cache()
- assert len(rdb.databases()) == 1
-
-
+
+ def make_alt_file(self, alt_path, alt_list):
+ """Create an alternates file which contains the given alternates.
+ The list can be empty"""
+ alt_file = open(alt_path, "wb")
+ for alt in alt_list:
+ alt_file.write(alt + "\n")
+ alt_file.close()
+
+ @with_rw_directory
+ def test_writing(self, path):
+ NULL_BIN_SHA = '\0' * 20
+
+ alt_path = os.path.join(path, 'alternates')
+ rdb = ReferenceDB(alt_path)
+ assert len(rdb.databases()) == 0
+ assert rdb.size() == 0
+ assert len(list(rdb.sha_iter())) == 0
+
+ # try empty, non-existing
+ assert not rdb.has_object(NULL_BIN_SHA)
+
+
+ # setup alternate file
+ # add two, one is invalid
+ own_repo_path = fixture_path('../../../.git/objects') # use own repo
+ self.make_alt_file(alt_path, [own_repo_path, "invalid/path"])
+ rdb.update_cache()
+ assert len(rdb.databases()) == 1
+
+ # we should now find a default revision of ours
+ gitdb_sha = hex_to_bin("5690fd0d3304f378754b23b098bd7cb5f4aa1976")
+ assert rdb.has_object(gitdb_sha)
+
+ # remove valid
+ self.make_alt_file(alt_path, ["just/one/invalid/path"])
+ rdb.update_cache()
+ assert len(rdb.databases()) == 0
+
+ # add valid
+ self.make_alt_file(alt_path, [own_repo_path])
+ rdb.update_cache()
+ assert len(rdb.databases()) == 1
+
+
diff --git a/gitdb/test/lib.py b/gitdb/test/lib.py
index 50645be..ac8473a 100644
--- a/gitdb/test/lib.py
+++ b/gitdb/test/lib.py
@@ -4,12 +4,12 @@
# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
"""Utilities used in ODB testing"""
from gitdb import (
- OStream,
- )
+ OStream,
+ )
from gitdb.stream import (
- Sha1Writer,
- ZippedStoreShaWriter
- )
+ Sha1Writer,
+ ZippedStoreShaWriter
+ )
from gitdb.util import zlib
@@ -29,134 +29,134 @@ import gc
#{ Bases
class TestBase(unittest.TestCase):
- """Base class for all tests"""
-
+ """Base class for all tests"""
+
#} END bases
#{ Decorators
def with_rw_directory(func):
- """Create a temporary directory which can be written to, remove it if the
- test suceeds, but leave it otherwise to aid additional debugging"""
- def wrapper(self):
- path = tempfile.mktemp(prefix=func.__name__)
- os.mkdir(path)
- keep = False
- try:
- try:
- return func(self, path)
- except Exception:
- print >> sys.stderr, "Test %s.%s failed, output is at %r" % (type(self).__name__, func.__name__, path)
- keep = True
- raise
- finally:
- # Need to collect here to be sure all handles have been closed. It appears
- # a windows-only issue. In fact things should be deleted, as well as
- # memory maps closed, once objects go out of scope. For some reason
- # though this is not the case here unless we collect explicitly.
- if not keep:
- gc.collect()
- shutil.rmtree(path)
- # END handle exception
- # END wrapper
-
- wrapper.__name__ = func.__name__
- return wrapper
+ """Create a temporary directory which can be written to, remove it if the
+ test suceeds, but leave it otherwise to aid additional debugging"""
+ def wrapper(self):
+ path = tempfile.mktemp(prefix=func.__name__)
+ os.mkdir(path)
+ keep = False
+ try:
+ try:
+ return func(self, path)
+ except Exception:
+ print >> sys.stderr, "Test %s.%s failed, output is at %r" % (type(self).__name__, func.__name__, path)
+ keep = True
+ raise
+ finally:
+ # Need to collect here to be sure all handles have been closed. It appears
+ # a windows-only issue. In fact things should be deleted, as well as
+ # memory maps closed, once objects go out of scope. For some reason
+ # though this is not the case here unless we collect explicitly.
+ if not keep:
+ gc.collect()
+ shutil.rmtree(path)
+ # END handle exception
+ # END wrapper
+
+ wrapper.__name__ = func.__name__
+ return wrapper
def with_packs_rw(func):
- """Function that provides a path into which the packs for testing should be
- copied. Will pass on the path to the actual function afterwards"""
- def wrapper(self, path):
- src_pack_glob = fixture_path('packs/*')
- copy_files_globbed(src_pack_glob, path, hard_link_ok=True)
- return func(self, path)
- # END wrapper
-
- wrapper.__name__ = func.__name__
- return wrapper
+ """Function that provides a path into which the packs for testing should be
+ copied. Will pass on the path to the actual function afterwards"""
+ def wrapper(self, path):
+ src_pack_glob = fixture_path('packs/*')
+ copy_files_globbed(src_pack_glob, path, hard_link_ok=True)
+ return func(self, path)
+ # END wrapper
+
+ wrapper.__name__ = func.__name__
+ return wrapper
#} END decorators
#{ Routines
def fixture_path(relapath=''):
- """:return: absolute path into the fixture directory
- :param relapath: relative path into the fixtures directory, or ''
- to obtain the fixture directory itself"""
- return os.path.join(os.path.dirname(__file__), 'fixtures', relapath)
-
+ """:return: absolute path into the fixture directory
+ :param relapath: relative path into the fixtures directory, or ''
+ to obtain the fixture directory itself"""
+ return os.path.join(os.path.dirname(__file__), 'fixtures', relapath)
+
def copy_files_globbed(source_glob, target_dir, hard_link_ok=False):
- """Copy all files found according to the given source glob into the target directory
- :param hard_link_ok: if True, hard links will be created if possible. Otherwise
- the files will be copied"""
- for src_file in glob.glob(source_glob):
- if hard_link_ok and hasattr(os, 'link'):
- target = os.path.join(target_dir, os.path.basename(src_file))
- try:
- os.link(src_file, target)
- except OSError:
- shutil.copy(src_file, target_dir)
- # END handle cross device links ( and resulting failure )
- else:
- shutil.copy(src_file, target_dir)
- # END try hard link
- # END for each file to copy
-
+ """Copy all files found according to the given source glob into the target directory
+ :param hard_link_ok: if True, hard links will be created if possible. Otherwise
+ the files will be copied"""
+ for src_file in glob.glob(source_glob):
+ if hard_link_ok and hasattr(os, 'link'):
+ target = os.path.join(target_dir, os.path.basename(src_file))
+ try:
+ os.link(src_file, target)
+ except OSError:
+ shutil.copy(src_file, target_dir)
+ # END handle cross device links ( and resulting failure )
+ else:
+ shutil.copy(src_file, target_dir)
+ # END try hard link
+ # END for each file to copy
+
def make_bytes(size_in_bytes, randomize=False):
- """:return: string with given size in bytes
- :param randomize: try to produce a very random stream"""
- actual_size = size_in_bytes / 4
- producer = xrange(actual_size)
- if randomize:
- producer = list(producer)
- random.shuffle(producer)
- # END randomize
- a = array('i', producer)
- return a.tostring()
+ """:return: string with given size in bytes
+ :param randomize: try to produce a very random stream"""
+ actual_size = size_in_bytes / 4
+ producer = xrange(actual_size)
+ if randomize:
+ producer = list(producer)
+ random.shuffle(producer)
+ # END randomize
+ a = array('i', producer)
+ return a.tostring()
def make_object(type, data):
- """:return: bytes resembling an uncompressed object"""
- odata = "blob %i\0" % len(data)
- return odata + data
-
+ """:return: bytes resembling an uncompressed object"""
+ odata = "blob %i\0" % len(data)
+ return odata + data
+
def make_memory_file(size_in_bytes, randomize=False):
- """:return: tuple(size_of_stream, stream)
- :param randomize: try to produce a very random stream"""
- d = make_bytes(size_in_bytes, randomize)
- return len(d), StringIO(d)
+ """:return: tuple(size_of_stream, stream)
+ :param randomize: try to produce a very random stream"""
+ d = make_bytes(size_in_bytes, randomize)
+ return len(d), StringIO(d)
#} END routines
#{ Stream Utilities
class DummyStream(object):
- def __init__(self):
- self.was_read = False
- self.bytes = 0
- self.closed = False
-
- def read(self, size):
- self.was_read = True
- self.bytes = size
-
- def close(self):
- self.closed = True
-
- def _assert(self):
- assert self.was_read
+ def __init__(self):
+ self.was_read = False
+ self.bytes = 0
+ self.closed = False
+
+ def read(self, size):
+ self.was_read = True
+ self.bytes = size
+
+ def close(self):
+ self.closed = True
+
+ def _assert(self):
+ assert self.was_read
class DeriveTest(OStream):
- def __init__(self, sha, type, size, stream, *args, **kwargs):
- self.myarg = kwargs.pop('myarg')
- self.args = args
-
- def _assert(self):
- assert self.args
- assert self.myarg
+ def __init__(self, sha, type, size, stream, *args, **kwargs):
+ self.myarg = kwargs.pop('myarg')
+ self.args = args
+
+ def _assert(self):
+ assert self.args
+ assert self.myarg
#} END stream utilitiess
diff --git a/gitdb/test/performance/lib.py b/gitdb/test/performance/lib.py
index 761113d..3563fcf 100644
--- a/gitdb/test/performance/lib.py
+++ b/gitdb/test/performance/lib.py
@@ -16,12 +16,12 @@ k_env_git_repo = "GITDB_TEST_GIT_REPO_BASE"
#{ Utilities
def resolve_or_fail(env_var):
- """:return: resolved environment variable or raise EnvironmentError"""
- try:
- return os.environ[env_var]
- except KeyError:
- raise EnvironmentError("Please set the %r envrionment variable and retry" % env_var)
- # END exception handling
+ """:return: resolved environment variable or raise EnvironmentError"""
+ try:
+ return os.environ[env_var]
+ except KeyError:
+ raise EnvironmentError("Please set the %r envrionment variable and retry" % env_var)
+ # END exception handling
#} END utilities
@@ -29,26 +29,26 @@ def resolve_or_fail(env_var):
#{ Base Classes
class TestBigRepoR(TestBase):
- """TestCase providing access to readonly 'big' repositories using the following
- member variables:
-
- * gitrepopath
-
- * read-only base path of the git source repository, i.e. .../git/.git"""
-
- #{ Invariants
- head_sha_2k = '235d521da60e4699e5bd59ac658b5b48bd76ddca'
- head_sha_50 = '32347c375250fd470973a5d76185cac718955fd5'
- #} END invariants
-
- @classmethod
- def setUpAll(cls):
- try:
- super(TestBigRepoR, cls).setUpAll()
- except AttributeError:
- pass
- cls.gitrepopath = resolve_or_fail(k_env_git_repo)
- assert cls.gitrepopath.endswith('.git')
-
-
+ """TestCase providing access to readonly 'big' repositories using the following
+ member variables:
+
+ * gitrepopath
+
+ * read-only base path of the git source repository, i.e. .../git/.git"""
+
+ #{ Invariants
+ head_sha_2k = '235d521da60e4699e5bd59ac658b5b48bd76ddca'
+ head_sha_50 = '32347c375250fd470973a5d76185cac718955fd5'
+ #} END invariants
+
+ @classmethod
+ def setUpAll(cls):
+ try:
+ super(TestBigRepoR, cls).setUpAll()
+ except AttributeError:
+ pass
+ cls.gitrepopath = resolve_or_fail(k_env_git_repo)
+ assert cls.gitrepopath.endswith('.git')
+
+
#} END base classes
diff --git a/gitdb/test/performance/test_pack.py b/gitdb/test/performance/test_pack.py
index 2061802..63856e2 100644
--- a/gitdb/test/performance/test_pack.py
+++ b/gitdb/test/performance/test_pack.py
@@ -4,8 +4,8 @@
# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
"""Performance tests for object store"""
from lib import (
- TestBigRepoR
- )
+ TestBigRepoR
+ )
from gitdb.exc import UnsupportedOperation
from gitdb.db.pack import PackedDB
@@ -18,76 +18,76 @@ import random
from nose import SkipTest
class TestPackedDBPerformance(TestBigRepoR):
-
- def test_pack_random_access(self):
- pdb = PackedDB(os.path.join(self.gitrepopath, "objects/pack"))
-
- # sha lookup
- st = time()
- sha_list = list(pdb.sha_iter())
- elapsed = time() - st
- ns = len(sha_list)
- print >> sys.stderr, "PDB: looked up %i shas by index in %f s ( %f shas/s )" % (ns, elapsed, ns / elapsed)
-
- # sha lookup: best-case and worst case access
- pdb_pack_info = pdb._pack_info
- # END shuffle shas
- st = time()
- for sha in sha_list:
- pdb_pack_info(sha)
- # END for each sha to look up
- elapsed = time() - st
-
- # discard cache
- del(pdb._entities)
- pdb.entities()
- print >> sys.stderr, "PDB: looked up %i sha in %i packs in %f s ( %f shas/s )" % (ns, len(pdb.entities()), elapsed, ns / elapsed)
- # END for each random mode
-
- # query info and streams only
- max_items = 10000 # can wait longer when testing memory
- for pdb_fun in (pdb.info, pdb.stream):
- st = time()
- for sha in sha_list[:max_items]:
- pdb_fun(sha)
- elapsed = time() - st
- print >> sys.stderr, "PDB: Obtained %i object %s by sha in %f s ( %f items/s )" % (max_items, pdb_fun.__name__.upper(), elapsed, max_items / elapsed)
- # END for each function
-
- # retrieve stream and read all
- max_items = 5000
- pdb_stream = pdb.stream
- total_size = 0
- st = time()
- for sha in sha_list[:max_items]:
- stream = pdb_stream(sha)
- stream.read()
- total_size += stream.size
- elapsed = time() - st
- total_kib = total_size / 1000
- print >> sys.stderr, "PDB: Obtained %i streams by sha and read all bytes totallying %i KiB ( %f KiB / s ) in %f s ( %f streams/s )" % (max_items, total_kib, total_kib/elapsed , elapsed, max_items / elapsed)
-
- def test_correctness(self):
- raise SkipTest("Takes too long, enable it if you change the algorithm and want to be sure you decode packs correctly")
- pdb = PackedDB(os.path.join(self.gitrepopath, "objects/pack"))
- # disabled for now as it used to work perfectly, checking big repositories takes a long time
- print >> sys.stderr, "Endurance run: verify streaming of objects (crc and sha)"
- for crc in range(2):
- count = 0
- st = time()
- for entity in pdb.entities():
- pack_verify = entity.is_valid_stream
- sha_by_index = entity.index().sha
- for index in xrange(entity.index().size()):
- try:
- assert pack_verify(sha_by_index(index), use_crc=crc)
- count += 1
- except UnsupportedOperation:
- pass
- # END ignore old indices
- # END for each index
- # END for each entity
- elapsed = time() - st
- print >> sys.stderr, "PDB: verified %i objects (crc=%i) in %f s ( %f objects/s )" % (count, crc, elapsed, count / elapsed)
- # END for each verify mode
-
+
+ def test_pack_random_access(self):
+ pdb = PackedDB(os.path.join(self.gitrepopath, "objects/pack"))
+
+ # sha lookup
+ st = time()
+ sha_list = list(pdb.sha_iter())
+ elapsed = time() - st
+ ns = len(sha_list)
+ print >> sys.stderr, "PDB: looked up %i shas by index in %f s ( %f shas/s )" % (ns, elapsed, ns / elapsed)
+
+ # sha lookup: best-case and worst case access
+ pdb_pack_info = pdb._pack_info
+ # END shuffle shas
+ st = time()
+ for sha in sha_list:
+ pdb_pack_info(sha)
+ # END for each sha to look up
+ elapsed = time() - st
+
+ # discard cache
+ del(pdb._entities)
+ pdb.entities()
+ print >> sys.stderr, "PDB: looked up %i sha in %i packs in %f s ( %f shas/s )" % (ns, len(pdb.entities()), elapsed, ns / elapsed)
+ # END for each random mode
+
+ # query info and streams only
+ max_items = 10000 # can wait longer when testing memory
+ for pdb_fun in (pdb.info, pdb.stream):
+ st = time()
+ for sha in sha_list[:max_items]:
+ pdb_fun(sha)
+ elapsed = time() - st
+ print >> sys.stderr, "PDB: Obtained %i object %s by sha in %f s ( %f items/s )" % (max_items, pdb_fun.__name__.upper(), elapsed, max_items / elapsed)
+ # END for each function
+
+ # retrieve stream and read all
+ max_items = 5000
+ pdb_stream = pdb.stream
+ total_size = 0
+ st = time()
+ for sha in sha_list[:max_items]:
+ stream = pdb_stream(sha)
+ stream.read()
+ total_size += stream.size
+ elapsed = time() - st
+ total_kib = total_size / 1000
+ print >> sys.stderr, "PDB: Obtained %i streams by sha and read all bytes totallying %i KiB ( %f KiB / s ) in %f s ( %f streams/s )" % (max_items, total_kib, total_kib/elapsed , elapsed, max_items / elapsed)
+
+ def test_correctness(self):
+ raise SkipTest("Takes too long, enable it if you change the algorithm and want to be sure you decode packs correctly")
+ pdb = PackedDB(os.path.join(self.gitrepopath, "objects/pack"))
+ # disabled for now as it used to work perfectly, checking big repositories takes a long time
+ print >> sys.stderr, "Endurance run: verify streaming of objects (crc and sha)"
+ for crc in range(2):
+ count = 0
+ st = time()
+ for entity in pdb.entities():
+ pack_verify = entity.is_valid_stream
+ sha_by_index = entity.index().sha
+ for index in xrange(entity.index().size()):
+ try:
+ assert pack_verify(sha_by_index(index), use_crc=crc)
+ count += 1
+ except UnsupportedOperation:
+ pass
+ # END ignore old indices
+ # END for each index
+ # END for each entity
+ elapsed = time() - st
+ print >> sys.stderr, "PDB: verified %i objects (crc=%i) in %f s ( %f objects/s )" % (count, crc, elapsed, count / elapsed)
+ # END for each verify mode
+
diff --git a/gitdb/test/performance/test_pack_streaming.py b/gitdb/test/performance/test_pack_streaming.py
index 3c40ed0..c66e60c 100644
--- a/gitdb/test/performance/test_pack_streaming.py
+++ b/gitdb/test/performance/test_pack_streaming.py
@@ -4,8 +4,8 @@
# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
"""Specific test for pack streams only"""
from lib import (
- TestBigRepoR
- )
+ TestBigRepoR
+ )
from gitdb.db.pack import PackedDB
from gitdb.stream import NullStream
@@ -17,63 +17,63 @@ from time import time
from nose import SkipTest
class CountedNullStream(NullStream):
- __slots__ = '_bw'
- def __init__(self):
- self._bw = 0
-
- def bytes_written(self):
- return self._bw
-
- def write(self, d):
- self._bw += NullStream.write(self, d)
-
+ __slots__ = '_bw'
+ def __init__(self):
+ self._bw = 0
+
+ def bytes_written(self):
+ return self._bw
+
+ def write(self, d):
+ self._bw += NullStream.write(self, d)
+
class TestPackStreamingPerformance(TestBigRepoR):
-
- def test_pack_writing(self):
- # see how fast we can write a pack from object streams.
- # This will not be fast, as we take time for decompressing the streams as well
- ostream = CountedNullStream()
- pdb = PackedDB(os.path.join(self.gitrepopath, "objects/pack"))
-
- ni = 5000
- count = 0
- total_size = 0
- st = time()
- for sha in pdb.sha_iter():
- count += 1
- pdb.stream(sha)
- if count == ni:
- break
- #END gather objects for pack-writing
- elapsed = time() - st
- print >> sys.stderr, "PDB Streaming: Got %i streams by sha in in %f s ( %f streams/s )" % (ni, elapsed, ni / elapsed)
-
- st = time()
- PackEntity.write_pack((pdb.stream(sha) for sha in pdb.sha_iter()), ostream.write, object_count=ni)
- elapsed = time() - st
- total_kb = ostream.bytes_written() / 1000
- print >> sys.stderr, "PDB Streaming: Wrote pack of size %i kb in %f s (%f kb/s)" % (total_kb, elapsed, total_kb/elapsed)
-
-
- def test_stream_reading(self):
- raise SkipTest()
- pdb = PackedDB(os.path.join(self.gitrepopath, "objects/pack"))
-
- # streaming only, meant for --with-profile runs
- ni = 5000
- count = 0
- pdb_stream = pdb.stream
- total_size = 0
- st = time()
- for sha in pdb.sha_iter():
- if count == ni:
- break
- stream = pdb_stream(sha)
- stream.read()
- total_size += stream.size
- count += 1
- elapsed = time() - st
- total_kib = total_size / 1000
- print >> sys.stderr, "PDB Streaming: Got %i streams by sha and read all bytes totallying %i KiB ( %f KiB / s ) in %f s ( %f streams/s )" % (ni, total_kib, total_kib/elapsed , elapsed, ni / elapsed)
-
+
+ def test_pack_writing(self):
+ # see how fast we can write a pack from object streams.
+ # This will not be fast, as we take time for decompressing the streams as well
+ ostream = CountedNullStream()
+ pdb = PackedDB(os.path.join(self.gitrepopath, "objects/pack"))
+
+ ni = 5000
+ count = 0
+ total_size = 0
+ st = time()
+ for sha in pdb.sha_iter():
+ count += 1
+ pdb.stream(sha)
+ if count == ni:
+ break
+ #END gather objects for pack-writing
+ elapsed = time() - st
+ print >> sys.stderr, "PDB Streaming: Got %i streams by sha in in %f s ( %f streams/s )" % (ni, elapsed, ni / elapsed)
+
+ st = time()
+ PackEntity.write_pack((pdb.stream(sha) for sha in pdb.sha_iter()), ostream.write, object_count=ni)
+ elapsed = time() - st
+ total_kb = ostream.bytes_written() / 1000
+ print >> sys.stderr, "PDB Streaming: Wrote pack of size %i kb in %f s (%f kb/s)" % (total_kb, elapsed, total_kb/elapsed)
+
+
+ def test_stream_reading(self):
+ raise SkipTest()
+ pdb = PackedDB(os.path.join(self.gitrepopath, "objects/pack"))
+
+ # streaming only, meant for --with-profile runs
+ ni = 5000
+ count = 0
+ pdb_stream = pdb.stream
+ total_size = 0
+ st = time()
+ for sha in pdb.sha_iter():
+ if count == ni:
+ break
+ stream = pdb_stream(sha)
+ stream.read()
+ total_size += stream.size
+ count += 1
+ elapsed = time() - st
+ total_kib = total_size / 1000
+ print >> sys.stderr, "PDB Streaming: Got %i streams by sha and read all bytes totallying %i KiB ( %f KiB / s ) in %f s ( %f streams/s )" % (ni, total_kib, total_kib/elapsed , elapsed, ni / elapsed)
+
diff --git a/gitdb/test/performance/test_stream.py b/gitdb/test/performance/test_stream.py
index f5f2e2e..010003d 100644
--- a/gitdb/test/performance/test_stream.py
+++ b/gitdb/test/performance/test_stream.py
@@ -8,16 +8,16 @@ from gitdb.db import *
from gitdb.base import *
from gitdb.stream import *
from gitdb.util import (
- pool,
- bin_to_hex
- )
+ pool,
+ bin_to_hex
+ )
from gitdb.typ import str_blob_type
from gitdb.fun import chunk_size
from async import (
- IteratorReader,
- ChannelThreadTask,
- )
+ IteratorReader,
+ ChannelThreadTask,
+ )
from cStringIO import StringIO
from time import time
@@ -28,168 +28,168 @@ import subprocess
from lib import (
- TestBigRepoR,
- make_memory_file,
- with_rw_directory
- )
+ TestBigRepoR,
+ make_memory_file,
+ with_rw_directory
+ )
#{ Utilities
def read_chunked_stream(stream):
- total = 0
- while True:
- chunk = stream.read(chunk_size)
- total += len(chunk)
- if len(chunk) < chunk_size:
- break
- # END read stream loop
- assert total == stream.size
- return stream
-
-
+ total = 0
+ while True:
+ chunk = stream.read(chunk_size)
+ total += len(chunk)
+ if len(chunk) < chunk_size:
+ break
+ # END read stream loop
+ assert total == stream.size
+ return stream
+
+
class TestStreamReader(ChannelThreadTask):
- """Expects input streams and reads them in chunks. It will read one at a time,
- requireing a queue chunk of size 1"""
- def __init__(self, *args):
- super(TestStreamReader, self).__init__(*args)
- self.fun = read_chunked_stream
- self.max_chunksize = 1
-
+ """Expects input streams and reads them in chunks. It will read one at a time,
+ requireing a queue chunk of size 1"""
+ def __init__(self, *args):
+ super(TestStreamReader, self).__init__(*args)
+ self.fun = read_chunked_stream
+ self.max_chunksize = 1
+
#} END utilities
class TestObjDBPerformance(TestBigRepoR):
-
- large_data_size_bytes = 1000*1000*50 # some MiB should do it
- moderate_data_size_bytes = 1000*1000*1 # just 1 MiB
-
- @with_rw_directory
- def test_large_data_streaming(self, path):
- ldb = LooseObjectDB(path)
- string_ios = list() # list of streams we previously created
-
- # serial mode
- for randomize in range(2):
- desc = (randomize and 'random ') or ''
- print >> sys.stderr, "Creating %s data ..." % desc
- st = time()
- size, stream = make_memory_file(self.large_data_size_bytes, randomize)
- elapsed = time() - st
- print >> sys.stderr, "Done (in %f s)" % elapsed
- string_ios.append(stream)
-
- # writing - due to the compression it will seem faster than it is
- st = time()
- sha = ldb.store(IStream('blob', size, stream)).binsha
- elapsed_add = time() - st
- assert ldb.has_object(sha)
- db_file = ldb.readable_db_object_path(bin_to_hex(sha))
- fsize_kib = os.path.getsize(db_file) / 1000
-
-
- size_kib = size / 1000
- print >> sys.stderr, "Added %i KiB (filesize = %i KiB) of %s data to loose odb in %f s ( %f Write KiB / s)" % (size_kib, fsize_kib, desc, elapsed_add, size_kib / elapsed_add)
-
- # reading all at once
- st = time()
- ostream = ldb.stream(sha)
- shadata = ostream.read()
- elapsed_readall = time() - st
-
- stream.seek(0)
- assert shadata == stream.getvalue()
- print >> sys.stderr, "Read %i KiB of %s data at once from loose odb in %f s ( %f Read KiB / s)" % (size_kib, desc, elapsed_readall, size_kib / elapsed_readall)
-
-
- # reading in chunks of 1 MiB
- cs = 512*1000
- chunks = list()
- st = time()
- ostream = ldb.stream(sha)
- while True:
- data = ostream.read(cs)
- chunks.append(data)
- if len(data) < cs:
- break
- # END read in chunks
- elapsed_readchunks = time() - st
-
- stream.seek(0)
- assert ''.join(chunks) == stream.getvalue()
-
- cs_kib = cs / 1000
- print >> sys.stderr, "Read %i KiB of %s data in %i KiB chunks from loose odb in %f s ( %f Read KiB / s)" % (size_kib, desc, cs_kib, elapsed_readchunks, size_kib / elapsed_readchunks)
-
- # del db file so we keep something to do
- os.remove(db_file)
- # END for each randomization factor
-
-
- # multi-threaded mode
- # want two, should be supported by most of todays cpus
- pool.set_size(2)
- total_kib = 0
- nsios = len(string_ios)
- for stream in string_ios:
- stream.seek(0)
- total_kib += len(stream.getvalue()) / 1000
- # END rewind
-
- def istream_iter():
- for stream in string_ios:
- stream.seek(0)
- yield IStream(str_blob_type, len(stream.getvalue()), stream)
- # END for each stream
- # END util
-
- # write multiple objects at once, involving concurrent compression
- reader = IteratorReader(istream_iter())
- istream_reader = ldb.store_async(reader)
- istream_reader.task().max_chunksize = 1
-
- st = time()
- istreams = istream_reader.read(nsios)
- assert len(istreams) == nsios
- elapsed = time() - st
-
- print >> sys.stderr, "Threads(%i): Compressed %i KiB of data in loose odb in %f s ( %f Write KiB / s)" % (pool.size(), total_kib, elapsed, total_kib / elapsed)
-
- # decompress multiple at once, by reading them
- # chunk size is not important as the stream will not really be decompressed
-
- # until its read
- istream_reader = IteratorReader(iter([ i.binsha for i in istreams ]))
- ostream_reader = ldb.stream_async(istream_reader)
-
- chunk_task = TestStreamReader(ostream_reader, "chunker", None)
- output_reader = pool.add_task(chunk_task)
- output_reader.task().max_chunksize = 1
-
- st = time()
- assert len(output_reader.read(nsios)) == nsios
- elapsed = time() - st
-
- print >> sys.stderr, "Threads(%i): Decompressed %i KiB of data in loose odb in %f s ( %f Read KiB / s)" % (pool.size(), total_kib, elapsed, total_kib / elapsed)
-
- # store the files, and read them back. For the reading, we use a task
- # as well which is chunked into one item per task. Reading all will
- # very quickly result in two threads handling two bytestreams of
- # chained compression/decompression streams
- reader = IteratorReader(istream_iter())
- istream_reader = ldb.store_async(reader)
- istream_reader.task().max_chunksize = 1
-
- istream_to_sha = lambda items: [ i.binsha for i in items ]
- istream_reader.set_post_cb(istream_to_sha)
-
- ostream_reader = ldb.stream_async(istream_reader)
-
- chunk_task = TestStreamReader(ostream_reader, "chunker", None)
- output_reader = pool.add_task(chunk_task)
- output_reader.max_chunksize = 1
-
- st = time()
- assert len(output_reader.read(nsios)) == nsios
- elapsed = time() - st
-
- print >> sys.stderr, "Threads(%i): Compressed and decompressed and read %i KiB of data in loose odb in %f s ( %f Combined KiB / s)" % (pool.size(), total_kib, elapsed, total_kib / elapsed)
+
+ large_data_size_bytes = 1000*1000*50 # some MiB should do it
+ moderate_data_size_bytes = 1000*1000*1 # just 1 MiB
+
+ @with_rw_directory
+ def test_large_data_streaming(self, path):
+ ldb = LooseObjectDB(path)
+ string_ios = list() # list of streams we previously created
+
+ # serial mode
+ for randomize in range(2):
+ desc = (randomize and 'random ') or ''
+ print >> sys.stderr, "Creating %s data ..." % desc
+ st = time()
+ size, stream = make_memory_file(self.large_data_size_bytes, randomize)
+ elapsed = time() - st
+ print >> sys.stderr, "Done (in %f s)" % elapsed
+ string_ios.append(stream)
+
+ # writing - due to the compression it will seem faster than it is
+ st = time()
+ sha = ldb.store(IStream('blob', size, stream)).binsha
+ elapsed_add = time() - st
+ assert ldb.has_object(sha)
+ db_file = ldb.readable_db_object_path(bin_to_hex(sha))
+ fsize_kib = os.path.getsize(db_file) / 1000
+
+
+ size_kib = size / 1000
+ print >> sys.stderr, "Added %i KiB (filesize = %i KiB) of %s data to loose odb in %f s ( %f Write KiB / s)" % (size_kib, fsize_kib, desc, elapsed_add, size_kib / elapsed_add)
+
+ # reading all at once
+ st = time()
+ ostream = ldb.stream(sha)
+ shadata = ostream.read()
+ elapsed_readall = time() - st
+
+ stream.seek(0)
+ assert shadata == stream.getvalue()
+ print >> sys.stderr, "Read %i KiB of %s data at once from loose odb in %f s ( %f Read KiB / s)" % (size_kib, desc, elapsed_readall, size_kib / elapsed_readall)
+
+
+ # reading in chunks of 1 MiB
+ cs = 512*1000
+ chunks = list()
+ st = time()
+ ostream = ldb.stream(sha)
+ while True:
+ data = ostream.read(cs)
+ chunks.append(data)
+ if len(data) < cs:
+ break
+ # END read in chunks
+ elapsed_readchunks = time() - st
+
+ stream.seek(0)
+ assert ''.join(chunks) == stream.getvalue()
+
+ cs_kib = cs / 1000
+ print >> sys.stderr, "Read %i KiB of %s data in %i KiB chunks from loose odb in %f s ( %f Read KiB / s)" % (size_kib, desc, cs_kib, elapsed_readchunks, size_kib / elapsed_readchunks)
+
+ # del db file so we keep something to do
+ os.remove(db_file)
+ # END for each randomization factor
+
+
+ # multi-threaded mode
+ # want two, should be supported by most of todays cpus
+ pool.set_size(2)
+ total_kib = 0
+ nsios = len(string_ios)
+ for stream in string_ios:
+ stream.seek(0)
+ total_kib += len(stream.getvalue()) / 1000
+ # END rewind
+
+ def istream_iter():
+ for stream in string_ios:
+ stream.seek(0)
+ yield IStream(str_blob_type, len(stream.getvalue()), stream)
+ # END for each stream
+ # END util
+
+ # write multiple objects at once, involving concurrent compression
+ reader = IteratorReader(istream_iter())
+ istream_reader = ldb.store_async(reader)
+ istream_reader.task().max_chunksize = 1
+
+ st = time()
+ istreams = istream_reader.read(nsios)
+ assert len(istreams) == nsios
+ elapsed = time() - st
+
+ print >> sys.stderr, "Threads(%i): Compressed %i KiB of data in loose odb in %f s ( %f Write KiB / s)" % (pool.size(), total_kib, elapsed, total_kib / elapsed)
+
+ # decompress multiple at once, by reading them
+ # chunk size is not important as the stream will not really be decompressed
+
+ # until its read
+ istream_reader = IteratorReader(iter([ i.binsha for i in istreams ]))
+ ostream_reader = ldb.stream_async(istream_reader)
+
+ chunk_task = TestStreamReader(ostream_reader, "chunker", None)
+ output_reader = pool.add_task(chunk_task)
+ output_reader.task().max_chunksize = 1
+
+ st = time()
+ assert len(output_reader.read(nsios)) == nsios
+ elapsed = time() - st
+
+ print >> sys.stderr, "Threads(%i): Decompressed %i KiB of data in loose odb in %f s ( %f Read KiB / s)" % (pool.size(), total_kib, elapsed, total_kib / elapsed)
+
+ # store the files, and read them back. For the reading, we use a task
+ # as well which is chunked into one item per task. Reading all will
+ # very quickly result in two threads handling two bytestreams of
+ # chained compression/decompression streams
+ reader = IteratorReader(istream_iter())
+ istream_reader = ldb.store_async(reader)
+ istream_reader.task().max_chunksize = 1
+
+ istream_to_sha = lambda items: [ i.binsha for i in items ]
+ istream_reader.set_post_cb(istream_to_sha)
+
+ ostream_reader = ldb.stream_async(istream_reader)
+
+ chunk_task = TestStreamReader(ostream_reader, "chunker", None)
+ output_reader = pool.add_task(chunk_task)
+ output_reader.max_chunksize = 1
+
+ st = time()
+ assert len(output_reader.read(nsios)) == nsios
+ elapsed = time() - st
+
+ print >> sys.stderr, "Threads(%i): Compressed and decompressed and read %i KiB of data in loose odb in %f s ( %f Combined KiB / s)" % (pool.size(), total_kib, elapsed, total_kib / elapsed)
diff --git a/gitdb/test/test_base.py b/gitdb/test/test_base.py
index 1b20faf..d4ce428 100644
--- a/gitdb/test/test_base.py
+++ b/gitdb/test/test_base.py
@@ -4,95 +4,95 @@
# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
"""Test for object db"""
from lib import (
- TestBase,
- DummyStream,
- DeriveTest,
- )
+ TestBase,
+ DummyStream,
+ DeriveTest,
+ )
from gitdb import *
from gitdb.util import (
- NULL_BIN_SHA
- )
+ NULL_BIN_SHA
+ )
from gitdb.typ import (
- str_blob_type
- )
+ str_blob_type
+ )
class TestBaseTypes(TestBase):
-
- def test_streams(self):
- # test info
- sha = NULL_BIN_SHA
- s = 20
- blob_id = 3
-
- info = OInfo(sha, str_blob_type, s)
- assert info.binsha == sha
- assert info.type == str_blob_type
- assert info.type_id == blob_id
- assert info.size == s
-
- # test pack info
- # provides type_id
- pinfo = OPackInfo(0, blob_id, s)
- assert pinfo.type == str_blob_type
- assert pinfo.type_id == blob_id
- assert pinfo.pack_offset == 0
-
- dpinfo = ODeltaPackInfo(0, blob_id, s, sha)
- assert dpinfo.type == str_blob_type
- assert dpinfo.type_id == blob_id
- assert dpinfo.delta_info == sha
- assert dpinfo.pack_offset == 0
-
-
- # test ostream
- stream = DummyStream()
- ostream = OStream(*(info + (stream, )))
- assert ostream.stream is stream
- ostream.read(15)
- stream._assert()
- assert stream.bytes == 15
- ostream.read(20)
- assert stream.bytes == 20
-
- # test packstream
- postream = OPackStream(*(pinfo + (stream, )))
- assert postream.stream is stream
- postream.read(10)
- stream._assert()
- assert stream.bytes == 10
-
- # test deltapackstream
- dpostream = ODeltaPackStream(*(dpinfo + (stream, )))
- dpostream.stream is stream
- dpostream.read(5)
- stream._assert()
- assert stream.bytes == 5
-
- # derive with own args
- DeriveTest(sha, str_blob_type, s, stream, 'mine',myarg = 3)._assert()
-
- # test istream
- istream = IStream(str_blob_type, s, stream)
- assert istream.binsha == None
- istream.binsha = sha
- assert istream.binsha == sha
-
- assert len(istream.binsha) == 20
- assert len(istream.hexsha) == 40
-
- assert istream.size == s
- istream.size = s * 2
- istream.size == s * 2
- assert istream.type == str_blob_type
- istream.type = "something"
- assert istream.type == "something"
- assert istream.stream is stream
- istream.stream = None
- assert istream.stream is None
-
- assert istream.error is None
- istream.error = Exception()
- assert isinstance(istream.error, Exception)
+
+ def test_streams(self):
+ # test info
+ sha = NULL_BIN_SHA
+ s = 20
+ blob_id = 3
+
+ info = OInfo(sha, str_blob_type, s)
+ assert info.binsha == sha
+ assert info.type == str_blob_type
+ assert info.type_id == blob_id
+ assert info.size == s
+
+ # test pack info
+ # provides type_id
+ pinfo = OPackInfo(0, blob_id, s)
+ assert pinfo.type == str_blob_type
+ assert pinfo.type_id == blob_id
+ assert pinfo.pack_offset == 0
+
+ dpinfo = ODeltaPackInfo(0, blob_id, s, sha)
+ assert dpinfo.type == str_blob_type
+ assert dpinfo.type_id == blob_id
+ assert dpinfo.delta_info == sha
+ assert dpinfo.pack_offset == 0
+
+
+ # test ostream
+ stream = DummyStream()
+ ostream = OStream(*(info + (stream, )))
+ assert ostream.stream is stream
+ ostream.read(15)
+ stream._assert()
+ assert stream.bytes == 15
+ ostream.read(20)
+ assert stream.bytes == 20
+
+ # test packstream
+ postream = OPackStream(*(pinfo + (stream, )))
+ assert postream.stream is stream
+ postream.read(10)
+ stream._assert()
+ assert stream.bytes == 10
+
+ # test deltapackstream
+ dpostream = ODeltaPackStream(*(dpinfo + (stream, )))
+ dpostream.stream is stream
+ dpostream.read(5)
+ stream._assert()
+ assert stream.bytes == 5
+
+ # derive with own args
+ DeriveTest(sha, str_blob_type, s, stream, 'mine',myarg = 3)._assert()
+
+ # test istream
+ istream = IStream(str_blob_type, s, stream)
+ assert istream.binsha == None
+ istream.binsha = sha
+ assert istream.binsha == sha
+
+ assert len(istream.binsha) == 20
+ assert len(istream.hexsha) == 40
+
+ assert istream.size == s
+ istream.size = s * 2
+ istream.size == s * 2
+ assert istream.type == str_blob_type
+ istream.type = "something"
+ assert istream.type == "something"
+ assert istream.stream is stream
+ istream.stream = None
+ assert istream.stream is None
+
+ assert istream.error is None
+ istream.error = Exception()
+ assert isinstance(istream.error, Exception)
diff --git a/gitdb/test/test_example.py b/gitdb/test/test_example.py
index 7531775..611ae42 100644
--- a/gitdb/test/test_example.py
+++ b/gitdb/test/test_example.py
@@ -7,58 +7,58 @@ from lib import *
from gitdb import IStream
from gitdb.db import LooseObjectDB
from gitdb.util import pool
-
+
from cStringIO import StringIO
from async import IteratorReader
-
+
class TestExamples(TestBase):
-
- def test_base(self):
- ldb = LooseObjectDB(fixture_path("../../../.git/objects"))
-
- for sha1 in ldb.sha_iter():
- oinfo = ldb.info(sha1)
- ostream = ldb.stream(sha1)
- assert oinfo[:3] == ostream[:3]
-
- assert len(ostream.read()) == ostream.size
- assert ldb.has_object(oinfo.binsha)
- # END for each sha in database
- # assure we close all files
- try:
- del(ostream)
- del(oinfo)
- except UnboundLocalError:
- pass
- # END ignore exception if there are no loose objects
-
- data = "my data"
- istream = IStream("blob", len(data), StringIO(data))
-
- # the object does not yet have a sha
- assert istream.binsha is None
- ldb.store(istream)
- # now the sha is set
- assert len(istream.binsha) == 20
- assert ldb.has_object(istream.binsha)
-
-
- # async operation
- # Create a reader from an iterator
- reader = IteratorReader(ldb.sha_iter())
-
- # get reader for object streams
- info_reader = ldb.stream_async(reader)
-
- # read one
- info = info_reader.read(1)[0]
-
- # read all the rest until depletion
- ostreams = info_reader.read()
-
- # set the pool to use two threads
- pool.set_size(2)
-
- # synchronize the mode of operation
- pool.set_size(0)
+
+ def test_base(self):
+ ldb = LooseObjectDB(fixture_path("../../../.git/objects"))
+
+ for sha1 in ldb.sha_iter():
+ oinfo = ldb.info(sha1)
+ ostream = ldb.stream(sha1)
+ assert oinfo[:3] == ostream[:3]
+
+ assert len(ostream.read()) == ostream.size
+ assert ldb.has_object(oinfo.binsha)
+ # END for each sha in database
+ # assure we close all files
+ try:
+ del(ostream)
+ del(oinfo)
+ except UnboundLocalError:
+ pass
+ # END ignore exception if there are no loose objects
+
+ data = "my data"
+ istream = IStream("blob", len(data), StringIO(data))
+
+ # the object does not yet have a sha
+ assert istream.binsha is None
+ ldb.store(istream)
+ # now the sha is set
+ assert len(istream.binsha) == 20
+ assert ldb.has_object(istream.binsha)
+
+
+ # async operation
+ # Create a reader from an iterator
+ reader = IteratorReader(ldb.sha_iter())
+
+ # get reader for object streams
+ info_reader = ldb.stream_async(reader)
+
+ # read one
+ info = info_reader.read(1)[0]
+
+ # read all the rest until depletion
+ ostreams = info_reader.read()
+
+ # set the pool to use two threads
+ pool.set_size(2)
+
+ # synchronize the mode of operation
+ pool.set_size(0)
diff --git a/gitdb/test/test_pack.py b/gitdb/test/test_pack.py
index 4a7f1ca..779155a 100644
--- a/gitdb/test/test_pack.py
+++ b/gitdb/test/test_pack.py
@@ -4,23 +4,23 @@
# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
"""Test everything about packs reading and writing"""
from lib import (
- TestBase,
- with_rw_directory,
- with_packs_rw,
- fixture_path
- )
+ TestBase,
+ with_rw_directory,
+ with_packs_rw,
+ fixture_path
+ )
from gitdb.stream import DeltaApplyReader
from gitdb.pack import (
- PackEntity,
- PackIndexFile,
- PackFile
- )
+ PackEntity,
+ PackIndexFile,
+ PackFile
+ )
from gitdb.base import (
- OInfo,
- OStream,
- )
+ OInfo,
+ OStream,
+ )
from gitdb.fun import delta_types
from gitdb.exc import UnsupportedOperation
@@ -35,213 +35,213 @@ import tempfile
#{ Utilities
def bin_sha_from_filename(filename):
- return to_bin_sha(os.path.splitext(os.path.basename(filename))[0][5:])
+ return to_bin_sha(os.path.splitext(os.path.basename(filename))[0][5:])
#} END utilities
class TestPack(TestBase):
-
- packindexfile_v1 = (fixture_path('packs/pack-c0438c19fb16422b6bbcce24387b3264416d485b.idx'), 1, 67)
- packindexfile_v2 = (fixture_path('packs/pack-11fdfa9e156ab73caae3b6da867192221f2089c2.idx'), 2, 30)
- packindexfile_v2_3_ascii = (fixture_path('packs/pack-a2bf8e71d8c18879e499335762dd95119d93d9f1.idx'), 2, 42)
- packfile_v2_1 = (fixture_path('packs/pack-c0438c19fb16422b6bbcce24387b3264416d485b.pack'), 2, packindexfile_v1[2])
- packfile_v2_2 = (fixture_path('packs/pack-11fdfa9e156ab73caae3b6da867192221f2089c2.pack'), 2, packindexfile_v2[2])
- packfile_v2_3_ascii = (fixture_path('packs/pack-a2bf8e71d8c18879e499335762dd95119d93d9f1.pack'), 2, packindexfile_v2_3_ascii[2])
-
-
- def _assert_index_file(self, index, version, size):
- assert index.packfile_checksum() != index.indexfile_checksum()
- assert len(index.packfile_checksum()) == 20
- assert len(index.indexfile_checksum()) == 20
- assert index.version() == version
- assert index.size() == size
- assert len(index.offsets()) == size
-
- # get all data of all objects
- for oidx in xrange(index.size()):
- sha = index.sha(oidx)
- assert oidx == index.sha_to_index(sha)
-
- entry = index.entry(oidx)
- assert len(entry) == 3
-
- assert entry[0] == index.offset(oidx)
- assert entry[1] == sha
- assert entry[2] == index.crc(oidx)
-
- # verify partial sha
- for l in (4,8,11,17,20):
- assert index.partial_sha_to_index(sha[:l], l*2) == oidx
-
- # END for each object index in indexfile
- self.failUnlessRaises(ValueError, index.partial_sha_to_index, "\0", 2)
-
-
- def _assert_pack_file(self, pack, version, size):
- assert pack.version() == 2
- assert pack.size() == size
- assert len(pack.checksum()) == 20
-
- num_obj = 0
- for obj in pack.stream_iter():
- num_obj += 1
- info = pack.info(obj.pack_offset)
- stream = pack.stream(obj.pack_offset)
-
- assert info.pack_offset == stream.pack_offset
- assert info.type_id == stream.type_id
- assert hasattr(stream, 'read')
-
- # it should be possible to read from both streams
- assert obj.read() == stream.read()
-
- streams = pack.collect_streams(obj.pack_offset)
- assert streams
-
- # read the stream
- try:
- dstream = DeltaApplyReader.new(streams)
- except ValueError:
- # ignore these, old git versions use only ref deltas,
- # which we havent resolved ( as we are without an index )
- # Also ignore non-delta streams
- continue
- # END get deltastream
-
- # read all
- data = dstream.read()
- assert len(data) == dstream.size
-
- # test seek
- dstream.seek(0)
- assert dstream.read() == data
-
-
- # read chunks
- # NOTE: the current implementation is safe, it basically transfers
- # all calls to the underlying memory map
-
- # END for each object
- assert num_obj == size
-
-
- def test_pack_index(self):
- # check version 1 and 2
- for indexfile, version, size in (self.packindexfile_v1, self.packindexfile_v2):
- index = PackIndexFile(indexfile)
- self._assert_index_file(index, version, size)
- # END run tests
-
- def test_pack(self):
- # there is this special version 3, but apparently its like 2 ...
- for packfile, version, size in (self.packfile_v2_3_ascii, self.packfile_v2_1, self.packfile_v2_2):
- pack = PackFile(packfile)
- self._assert_pack_file(pack, version, size)
- # END for each pack to test
-
- @with_rw_directory
- def test_pack_entity(self, rw_dir):
- pack_objs = list()
- for packinfo, indexinfo in ( (self.packfile_v2_1, self.packindexfile_v1),
- (self.packfile_v2_2, self.packindexfile_v2),
- (self.packfile_v2_3_ascii, self.packindexfile_v2_3_ascii)):
- packfile, version, size = packinfo
- indexfile, version, size = indexinfo
- entity = PackEntity(packfile)
- assert entity.pack().path() == packfile
- assert entity.index().path() == indexfile
- pack_objs.extend(entity.stream_iter())
-
- count = 0
- for info, stream in izip(entity.info_iter(), entity.stream_iter()):
- count += 1
- assert info.binsha == stream.binsha
- assert len(info.binsha) == 20
- assert info.type_id == stream.type_id
- assert info.size == stream.size
-
- # we return fully resolved items, which is implied by the sha centric access
- assert not info.type_id in delta_types
-
- # try all calls
- assert len(entity.collect_streams(info.binsha))
- oinfo = entity.info(info.binsha)
- assert isinstance(oinfo, OInfo)
- assert oinfo.binsha is not None
- ostream = entity.stream(info.binsha)
- assert isinstance(ostream, OStream)
- assert ostream.binsha is not None
-
- # verify the stream
- try:
- assert entity.is_valid_stream(info.binsha, use_crc=True)
- except UnsupportedOperation:
- pass
- # END ignore version issues
- assert entity.is_valid_stream(info.binsha, use_crc=False)
- # END for each info, stream tuple
- assert count == size
-
- # END for each entity
-
- # pack writing - write all packs into one
- # index path can be None
- pack_path = tempfile.mktemp('', "pack", rw_dir)
- index_path = tempfile.mktemp('', 'index', rw_dir)
- iteration = 0
- def rewind_streams():
- for obj in pack_objs:
- obj.stream.seek(0)
- #END utility
- for ppath, ipath, num_obj in zip((pack_path, )*2, (index_path, None), (len(pack_objs), None)):
- pfile = open(ppath, 'wb')
- iwrite = None
- if ipath:
- ifile = open(ipath, 'wb')
- iwrite = ifile.write
- #END handle ip
-
- # make sure we rewind the streams ... we work on the same objects over and over again
- if iteration > 0:
- rewind_streams()
- #END rewind streams
- iteration += 1
-
- pack_sha, index_sha = PackEntity.write_pack(pack_objs, pfile.write, iwrite, object_count=num_obj)
- pfile.close()
- assert os.path.getsize(ppath) > 100
-
- # verify pack
- pf = PackFile(ppath)
- assert pf.size() == len(pack_objs)
- assert pf.version() == PackFile.pack_version_default
- assert pf.checksum() == pack_sha
-
- # verify index
- if ipath is not None:
- ifile.close()
- assert os.path.getsize(ipath) > 100
- idx = PackIndexFile(ipath)
- assert idx.version() == PackIndexFile.index_version_default
- assert idx.packfile_checksum() == pack_sha
- assert idx.indexfile_checksum() == index_sha
- assert idx.size() == len(pack_objs)
- #END verify files exist
- #END for each packpath, indexpath pair
-
- # verify the packs throughly
- rewind_streams()
- entity = PackEntity.create(pack_objs, rw_dir)
- count = 0
- for info in entity.info_iter():
- count += 1
- for use_crc in range(2):
- assert entity.is_valid_stream(info.binsha, use_crc)
- # END for each crc mode
- #END for each info
- assert count == len(pack_objs)
-
-
- def test_pack_64(self):
- # TODO: hex-edit a pack helping us to verify that we can handle 64 byte offsets
- # of course without really needing such a huge pack
- raise SkipTest()
+
+ packindexfile_v1 = (fixture_path('packs/pack-c0438c19fb16422b6bbcce24387b3264416d485b.idx'), 1, 67)
+ packindexfile_v2 = (fixture_path('packs/pack-11fdfa9e156ab73caae3b6da867192221f2089c2.idx'), 2, 30)
+ packindexfile_v2_3_ascii = (fixture_path('packs/pack-a2bf8e71d8c18879e499335762dd95119d93d9f1.idx'), 2, 42)
+ packfile_v2_1 = (fixture_path('packs/pack-c0438c19fb16422b6bbcce24387b3264416d485b.pack'), 2, packindexfile_v1[2])
+ packfile_v2_2 = (fixture_path('packs/pack-11fdfa9e156ab73caae3b6da867192221f2089c2.pack'), 2, packindexfile_v2[2])
+ packfile_v2_3_ascii = (fixture_path('packs/pack-a2bf8e71d8c18879e499335762dd95119d93d9f1.pack'), 2, packindexfile_v2_3_ascii[2])
+
+
+ def _assert_index_file(self, index, version, size):
+ assert index.packfile_checksum() != index.indexfile_checksum()
+ assert len(index.packfile_checksum()) == 20
+ assert len(index.indexfile_checksum()) == 20
+ assert index.version() == version
+ assert index.size() == size
+ assert len(index.offsets()) == size
+
+ # get all data of all objects
+ for oidx in xrange(index.size()):
+ sha = index.sha(oidx)
+ assert oidx == index.sha_to_index(sha)
+
+ entry = index.entry(oidx)
+ assert len(entry) == 3
+
+ assert entry[0] == index.offset(oidx)
+ assert entry[1] == sha
+ assert entry[2] == index.crc(oidx)
+
+ # verify partial sha
+ for l in (4,8,11,17,20):
+ assert index.partial_sha_to_index(sha[:l], l*2) == oidx
+
+ # END for each object index in indexfile
+ self.failUnlessRaises(ValueError, index.partial_sha_to_index, "\0", 2)
+
+
+ def _assert_pack_file(self, pack, version, size):
+ assert pack.version() == 2
+ assert pack.size() == size
+ assert len(pack.checksum()) == 20
+
+ num_obj = 0
+ for obj in pack.stream_iter():
+ num_obj += 1
+ info = pack.info(obj.pack_offset)
+ stream = pack.stream(obj.pack_offset)
+
+ assert info.pack_offset == stream.pack_offset
+ assert info.type_id == stream.type_id
+ assert hasattr(stream, 'read')
+
+ # it should be possible to read from both streams
+ assert obj.read() == stream.read()
+
+ streams = pack.collect_streams(obj.pack_offset)
+ assert streams
+
+ # read the stream
+ try:
+ dstream = DeltaApplyReader.new(streams)
+ except ValueError:
+ # ignore these, old git versions use only ref deltas,
+ # which we havent resolved ( as we are without an index )
+ # Also ignore non-delta streams
+ continue
+ # END get deltastream
+
+ # read all
+ data = dstream.read()
+ assert len(data) == dstream.size
+
+ # test seek
+ dstream.seek(0)
+ assert dstream.read() == data
+
+
+ # read chunks
+ # NOTE: the current implementation is safe, it basically transfers
+ # all calls to the underlying memory map
+
+ # END for each object
+ assert num_obj == size
+
+
+ def test_pack_index(self):
+ # check version 1 and 2
+ for indexfile, version, size in (self.packindexfile_v1, self.packindexfile_v2):
+ index = PackIndexFile(indexfile)
+ self._assert_index_file(index, version, size)
+ # END run tests
+
+ def test_pack(self):
+ # there is this special version 3, but apparently its like 2 ...
+ for packfile, version, size in (self.packfile_v2_3_ascii, self.packfile_v2_1, self.packfile_v2_2):
+ pack = PackFile(packfile)
+ self._assert_pack_file(pack, version, size)
+ # END for each pack to test
+
+ @with_rw_directory
+ def test_pack_entity(self, rw_dir):
+ pack_objs = list()
+ for packinfo, indexinfo in ( (self.packfile_v2_1, self.packindexfile_v1),
+ (self.packfile_v2_2, self.packindexfile_v2),
+ (self.packfile_v2_3_ascii, self.packindexfile_v2_3_ascii)):
+ packfile, version, size = packinfo
+ indexfile, version, size = indexinfo
+ entity = PackEntity(packfile)
+ assert entity.pack().path() == packfile
+ assert entity.index().path() == indexfile
+ pack_objs.extend(entity.stream_iter())
+
+ count = 0
+ for info, stream in izip(entity.info_iter(), entity.stream_iter()):
+ count += 1
+ assert info.binsha == stream.binsha
+ assert len(info.binsha) == 20
+ assert info.type_id == stream.type_id
+ assert info.size == stream.size
+
+ # we return fully resolved items, which is implied by the sha centric access
+ assert not info.type_id in delta_types
+
+ # try all calls
+ assert len(entity.collect_streams(info.binsha))
+ oinfo = entity.info(info.binsha)
+ assert isinstance(oinfo, OInfo)
+ assert oinfo.binsha is not None
+ ostream = entity.stream(info.binsha)
+ assert isinstance(ostream, OStream)
+ assert ostream.binsha is not None
+
+ # verify the stream
+ try:
+ assert entity.is_valid_stream(info.binsha, use_crc=True)
+ except UnsupportedOperation:
+ pass
+ # END ignore version issues
+ assert entity.is_valid_stream(info.binsha, use_crc=False)
+ # END for each info, stream tuple
+ assert count == size
+
+ # END for each entity
+
+ # pack writing - write all packs into one
+ # index path can be None
+ pack_path = tempfile.mktemp('', "pack", rw_dir)
+ index_path = tempfile.mktemp('', 'index', rw_dir)
+ iteration = 0
+ def rewind_streams():
+ for obj in pack_objs:
+ obj.stream.seek(0)
+ #END utility
+ for ppath, ipath, num_obj in zip((pack_path, )*2, (index_path, None), (len(pack_objs), None)):
+ pfile = open(ppath, 'wb')
+ iwrite = None
+ if ipath:
+ ifile = open(ipath, 'wb')
+ iwrite = ifile.write
+ #END handle ip
+
+ # make sure we rewind the streams ... we work on the same objects over and over again
+ if iteration > 0:
+ rewind_streams()
+ #END rewind streams
+ iteration += 1
+
+ pack_sha, index_sha = PackEntity.write_pack(pack_objs, pfile.write, iwrite, object_count=num_obj)
+ pfile.close()
+ assert os.path.getsize(ppath) > 100
+
+ # verify pack
+ pf = PackFile(ppath)
+ assert pf.size() == len(pack_objs)
+ assert pf.version() == PackFile.pack_version_default
+ assert pf.checksum() == pack_sha
+
+ # verify index
+ if ipath is not None:
+ ifile.close()
+ assert os.path.getsize(ipath) > 100
+ idx = PackIndexFile(ipath)
+ assert idx.version() == PackIndexFile.index_version_default
+ assert idx.packfile_checksum() == pack_sha
+ assert idx.indexfile_checksum() == index_sha
+ assert idx.size() == len(pack_objs)
+ #END verify files exist
+ #END for each packpath, indexpath pair
+
+ # verify the packs throughly
+ rewind_streams()
+ entity = PackEntity.create(pack_objs, rw_dir)
+ count = 0
+ for info in entity.info_iter():
+ count += 1
+ for use_crc in range(2):
+ assert entity.is_valid_stream(info.binsha, use_crc)
+ # END for each crc mode
+ #END for each info
+ assert count == len(pack_objs)
+
+
+ def test_pack_64(self):
+ # TODO: hex-edit a pack helping us to verify that we can handle 64 byte offsets
+ # of course without really needing such a huge pack
+ raise SkipTest()
diff --git a/gitdb/test/test_stream.py b/gitdb/test/test_stream.py
index 523f770..6dc2746 100644
--- a/gitdb/test/test_stream.py
+++ b/gitdb/test/test_stream.py
@@ -4,24 +4,24 @@
# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
"""Test for object db"""
from lib import (
- TestBase,
- DummyStream,
- Sha1Writer,
- make_bytes,
- make_object,
- fixture_path
- )
+ TestBase,
+ DummyStream,
+ Sha1Writer,
+ make_bytes,
+ make_object,
+ fixture_path
+ )
from gitdb import *
from gitdb.util import (
- NULL_HEX_SHA,
- hex_to_bin
- )
+ NULL_HEX_SHA,
+ hex_to_bin
+ )
from gitdb.util import zlib
from gitdb.typ import (
- str_blob_type
- )
+ str_blob_type
+ )
import time
import tempfile
@@ -31,124 +31,124 @@ import os
class TestStream(TestBase):
- """Test stream classes"""
-
- data_sizes = (15, 10000, 1000*1024+512)
-
- def _assert_stream_reader(self, stream, cdata, rewind_stream=lambda s: None):
- """Make stream tests - the orig_stream is seekable, allowing it to be
- rewound and reused
- :param cdata: the data we expect to read from stream, the contents
- :param rewind_stream: function called to rewind the stream to make it ready
- for reuse"""
- ns = 10
- assert len(cdata) > ns-1, "Data must be larger than %i, was %i" % (ns, len(cdata))
-
- # read in small steps
- ss = len(cdata) / ns
- for i in range(ns):
- data = stream.read(ss)
- chunk = cdata[i*ss:(i+1)*ss]
- assert data == chunk
- # END for each step
- rest = stream.read()
- if rest:
- assert rest == cdata[-len(rest):]
- # END handle rest
-
- if isinstance(stream, DecompressMemMapReader):
- assert len(stream.data()) == stream.compressed_bytes_read()
- # END handle special type
-
- rewind_stream(stream)
-
- # read everything
- rdata = stream.read()
- assert rdata == cdata
-
- if isinstance(stream, DecompressMemMapReader):
- assert len(stream.data()) == stream.compressed_bytes_read()
- # END handle special type
-
- def test_decompress_reader(self):
- for close_on_deletion in range(2):
- for with_size in range(2):
- for ds in self.data_sizes:
- cdata = make_bytes(ds, randomize=False)
-
- # zdata = zipped actual data
- # cdata = original content data
-
- # create reader
- if with_size:
- # need object data
- zdata = zlib.compress(make_object(str_blob_type, cdata))
- type, size, reader = DecompressMemMapReader.new(zdata, close_on_deletion)
- assert size == len(cdata)
- assert type == str_blob_type
-
- # even if we don't set the size, it will be set automatically on first read
- test_reader = DecompressMemMapReader(zdata, close_on_deletion=False)
- assert test_reader._s == len(cdata)
- else:
- # here we need content data
- zdata = zlib.compress(cdata)
- reader = DecompressMemMapReader(zdata, close_on_deletion, len(cdata))
- assert reader._s == len(cdata)
- # END get reader
-
- self._assert_stream_reader(reader, cdata, lambda r: r.seek(0))
-
- # put in a dummy stream for closing
- dummy = DummyStream()
- reader._m = dummy
-
- assert not dummy.closed
- del(reader)
- assert dummy.closed == close_on_deletion
- # END for each datasize
- # END whether size should be used
- # END whether stream should be closed when deleted
-
- def test_sha_writer(self):
- writer = Sha1Writer()
- assert 2 == writer.write("hi")
- assert len(writer.sha(as_hex=1)) == 40
- assert len(writer.sha(as_hex=0)) == 20
-
- # make sure it does something ;)
- prev_sha = writer.sha()
- writer.write("hi again")
- assert writer.sha() != prev_sha
-
- def test_compressed_writer(self):
- for ds in self.data_sizes:
- fd, path = tempfile.mkstemp()
- ostream = FDCompressedSha1Writer(fd)
- data = make_bytes(ds, randomize=False)
-
- # for now, just a single write, code doesn't care about chunking
- assert len(data) == ostream.write(data)
- ostream.close()
-
- # its closed already
- self.failUnlessRaises(OSError, os.close, fd)
-
- # read everything back, compare to data we zip
- fd = os.open(path, os.O_RDONLY|getattr(os, 'O_BINARY', 0))
- written_data = os.read(fd, os.path.getsize(path))
- assert len(written_data) == os.path.getsize(path)
- os.close(fd)
- assert written_data == zlib.compress(data, 1) # best speed
-
- os.remove(path)
- # END for each os
-
- def test_decompress_reader_special_case(self):
- odb = LooseObjectDB(fixture_path('objects'))
- ostream = odb.stream(hex_to_bin('7bb839852ed5e3a069966281bb08d50012fb309b'))
-
- # if there is a bug, we will be missing one byte exactly !
- data = ostream.read()
- assert len(data) == ostream.size
-
+ """Test stream classes"""
+
+ data_sizes = (15, 10000, 1000*1024+512)
+
+ def _assert_stream_reader(self, stream, cdata, rewind_stream=lambda s: None):
+ """Make stream tests - the orig_stream is seekable, allowing it to be
+ rewound and reused
+ :param cdata: the data we expect to read from stream, the contents
+ :param rewind_stream: function called to rewind the stream to make it ready
+ for reuse"""
+ ns = 10
+ assert len(cdata) > ns-1, "Data must be larger than %i, was %i" % (ns, len(cdata))
+
+ # read in small steps
+ ss = len(cdata) / ns
+ for i in range(ns):
+ data = stream.read(ss)
+ chunk = cdata[i*ss:(i+1)*ss]
+ assert data == chunk
+ # END for each step
+ rest = stream.read()
+ if rest:
+ assert rest == cdata[-len(rest):]
+ # END handle rest
+
+ if isinstance(stream, DecompressMemMapReader):
+ assert len(stream.data()) == stream.compressed_bytes_read()
+ # END handle special type
+
+ rewind_stream(stream)
+
+ # read everything
+ rdata = stream.read()
+ assert rdata == cdata
+
+ if isinstance(stream, DecompressMemMapReader):
+ assert len(stream.data()) == stream.compressed_bytes_read()
+ # END handle special type
+
+ def test_decompress_reader(self):
+ for close_on_deletion in range(2):
+ for with_size in range(2):
+ for ds in self.data_sizes:
+ cdata = make_bytes(ds, randomize=False)
+
+ # zdata = zipped actual data
+ # cdata = original content data
+
+ # create reader
+ if with_size:
+ # need object data
+ zdata = zlib.compress(make_object(str_blob_type, cdata))
+ type, size, reader = DecompressMemMapReader.new(zdata, close_on_deletion)
+ assert size == len(cdata)
+ assert type == str_blob_type
+
+ # even if we don't set the size, it will be set automatically on first read
+ test_reader = DecompressMemMapReader(zdata, close_on_deletion=False)
+ assert test_reader._s == len(cdata)
+ else:
+ # here we need content data
+ zdata = zlib.compress(cdata)
+ reader = DecompressMemMapReader(zdata, close_on_deletion, len(cdata))
+ assert reader._s == len(cdata)
+ # END get reader
+
+ self._assert_stream_reader(reader, cdata, lambda r: r.seek(0))
+
+ # put in a dummy stream for closing
+ dummy = DummyStream()
+ reader._m = dummy
+
+ assert not dummy.closed
+ del(reader)
+ assert dummy.closed == close_on_deletion
+ # END for each datasize
+ # END whether size should be used
+ # END whether stream should be closed when deleted
+
+ def test_sha_writer(self):
+ writer = Sha1Writer()
+ assert 2 == writer.write("hi")
+ assert len(writer.sha(as_hex=1)) == 40
+ assert len(writer.sha(as_hex=0)) == 20
+
+ # make sure it does something ;)
+ prev_sha = writer.sha()
+ writer.write("hi again")
+ assert writer.sha() != prev_sha
+
+ def test_compressed_writer(self):
+ for ds in self.data_sizes:
+ fd, path = tempfile.mkstemp()
+ ostream = FDCompressedSha1Writer(fd)
+ data = make_bytes(ds, randomize=False)
+
+ # for now, just a single write, code doesn't care about chunking
+ assert len(data) == ostream.write(data)
+ ostream.close()
+
+ # its closed already
+ self.failUnlessRaises(OSError, os.close, fd)
+
+ # read everything back, compare to data we zip
+ fd = os.open(path, os.O_RDONLY|getattr(os, 'O_BINARY', 0))
+ written_data = os.read(fd, os.path.getsize(path))
+ assert len(written_data) == os.path.getsize(path)
+ os.close(fd)
+ assert written_data == zlib.compress(data, 1) # best speed
+
+ os.remove(path)
+ # END for each os
+
+ def test_decompress_reader_special_case(self):
+ odb = LooseObjectDB(fixture_path('objects'))
+ ostream = odb.stream(hex_to_bin('7bb839852ed5e3a069966281bb08d50012fb309b'))
+
+ # if there is a bug, we will be missing one byte exactly !
+ data = ostream.read()
+ assert len(data) == ostream.size
+
diff --git a/gitdb/test/test_util.py b/gitdb/test/test_util.py
index 90f4156..35f9f44 100644
--- a/gitdb/test/test_util.py
+++ b/gitdb/test/test_util.py
@@ -8,98 +8,98 @@ import os
from lib import TestBase
from gitdb.util import (
- to_hex_sha,
- to_bin_sha,
- NULL_HEX_SHA,
- LockedFD
- )
+ to_hex_sha,
+ to_bin_sha,
+ NULL_HEX_SHA,
+ LockedFD
+ )
-
+
class TestUtils(TestBase):
- def test_basics(self):
- assert to_hex_sha(NULL_HEX_SHA) == NULL_HEX_SHA
- assert len(to_bin_sha(NULL_HEX_SHA)) == 20
- assert to_hex_sha(to_bin_sha(NULL_HEX_SHA)) == NULL_HEX_SHA
-
- def _cmp_contents(self, file_path, data):
- # raise if data from file at file_path
- # does not match data string
- fp = open(file_path, "rb")
- try:
- assert fp.read() == data
- finally:
- fp.close()
-
- def test_lockedfd(self):
- my_file = tempfile.mktemp()
- orig_data = "hello"
- new_data = "world"
- my_file_fp = open(my_file, "wb")
- my_file_fp.write(orig_data)
- my_file_fp.close()
-
- try:
- lfd = LockedFD(my_file)
- lockfilepath = lfd._lockfilepath()
-
- # cannot end before it was started
- self.failUnlessRaises(AssertionError, lfd.rollback)
- self.failUnlessRaises(AssertionError, lfd.commit)
-
- # open for writing
- assert not os.path.isfile(lockfilepath)
- wfd = lfd.open(write=True)
- assert lfd._fd is wfd
- assert os.path.isfile(lockfilepath)
-
- # write data and fail
- os.write(wfd, new_data)
- lfd.rollback()
- assert lfd._fd is None
- self._cmp_contents(my_file, orig_data)
- assert not os.path.isfile(lockfilepath)
-
- # additional call doesnt fail
- lfd.commit()
- lfd.rollback()
-
- # test reading
- lfd = LockedFD(my_file)
- rfd = lfd.open(write=False)
- assert os.read(rfd, len(orig_data)) == orig_data
-
- assert os.path.isfile(lockfilepath)
- # deletion rolls back
- del(lfd)
- assert not os.path.isfile(lockfilepath)
-
-
- # write data - concurrently
- lfd = LockedFD(my_file)
- olfd = LockedFD(my_file)
- assert not os.path.isfile(lockfilepath)
- wfdstream = lfd.open(write=True, stream=True) # this time as stream
- assert os.path.isfile(lockfilepath)
- # another one fails
- self.failUnlessRaises(IOError, olfd.open)
-
- wfdstream.write(new_data)
- lfd.commit()
- assert not os.path.isfile(lockfilepath)
- self._cmp_contents(my_file, new_data)
-
- # could test automatic _end_writing on destruction
- finally:
- os.remove(my_file)
- # END final cleanup
-
- # try non-existing file for reading
- lfd = LockedFD(tempfile.mktemp())
- try:
- lfd.open(write=False)
- except OSError:
- assert not os.path.exists(lfd._lockfilepath())
- else:
- self.fail("expected OSError")
- # END handle exceptions
+ def test_basics(self):
+ assert to_hex_sha(NULL_HEX_SHA) == NULL_HEX_SHA
+ assert len(to_bin_sha(NULL_HEX_SHA)) == 20
+ assert to_hex_sha(to_bin_sha(NULL_HEX_SHA)) == NULL_HEX_SHA
+
+ def _cmp_contents(self, file_path, data):
+ # raise if data from file at file_path
+ # does not match data string
+ fp = open(file_path, "rb")
+ try:
+ assert fp.read() == data
+ finally:
+ fp.close()
+
+ def test_lockedfd(self):
+ my_file = tempfile.mktemp()
+ orig_data = "hello"
+ new_data = "world"
+ my_file_fp = open(my_file, "wb")
+ my_file_fp.write(orig_data)
+ my_file_fp.close()
+
+ try:
+ lfd = LockedFD(my_file)
+ lockfilepath = lfd._lockfilepath()
+
+ # cannot end before it was started
+ self.failUnlessRaises(AssertionError, lfd.rollback)
+ self.failUnlessRaises(AssertionError, lfd.commit)
+
+ # open for writing
+ assert not os.path.isfile(lockfilepath)
+ wfd = lfd.open(write=True)
+ assert lfd._fd is wfd
+ assert os.path.isfile(lockfilepath)
+
+ # write data and fail
+ os.write(wfd, new_data)
+ lfd.rollback()
+ assert lfd._fd is None
+ self._cmp_contents(my_file, orig_data)
+ assert not os.path.isfile(lockfilepath)
+
+ # additional call doesnt fail
+ lfd.commit()
+ lfd.rollback()
+
+ # test reading
+ lfd = LockedFD(my_file)
+ rfd = lfd.open(write=False)
+ assert os.read(rfd, len(orig_data)) == orig_data
+
+ assert os.path.isfile(lockfilepath)
+ # deletion rolls back
+ del(lfd)
+ assert not os.path.isfile(lockfilepath)
+
+
+ # write data - concurrently
+ lfd = LockedFD(my_file)
+ olfd = LockedFD(my_file)
+ assert not os.path.isfile(lockfilepath)
+ wfdstream = lfd.open(write=True, stream=True) # this time as stream
+ assert os.path.isfile(lockfilepath)
+ # another one fails
+ self.failUnlessRaises(IOError, olfd.open)
+
+ wfdstream.write(new_data)
+ lfd.commit()
+ assert not os.path.isfile(lockfilepath)
+ self._cmp_contents(my_file, new_data)
+
+ # could test automatic _end_writing on destruction
+ finally:
+ os.remove(my_file)
+ # END final cleanup
+
+ # try non-existing file for reading
+ lfd = LockedFD(tempfile.mktemp())
+ try:
+ lfd.open(write=False)
+ except OSError:
+ assert not os.path.exists(lfd._lockfilepath())
+ else:
+ self.fail("expected OSError")
+ # END handle exceptions