diff options
Diffstat (limited to 'gitdb/test/test_stream.py')
| -rw-r--r-- | gitdb/test/test_stream.py | 101 |
1 files changed, 49 insertions, 52 deletions
diff --git a/gitdb/test/test_stream.py b/gitdb/test/test_stream.py index 6dc2746..50db44b 100644 --- a/gitdb/test/test_stream.py +++ b/gitdb/test/test_stream.py @@ -3,49 +3,47 @@ # This module is part of GitDB and is released under # the New BSD License: http://www.opensource.org/licenses/bsd-license.php """Test for object db""" -from lib import ( - TestBase, - DummyStream, - Sha1Writer, - make_bytes, - make_object, - fixture_path - ) - -from gitdb import * -from gitdb.util import ( - NULL_HEX_SHA, - hex_to_bin - ) - -from gitdb.util import zlib + +from gitdb.test.lib import ( + TestBase, + DummyStream, + make_bytes, + make_object, + fixture_path +) + +from gitdb import ( + DecompressMemMapReader, + FDCompressedSha1Writer, + LooseObjectDB, + Sha1Writer +) +from gitdb.util import hex_to_bin + +import zlib from gitdb.typ import ( str_blob_type - ) +) -import time import tempfile import os - - - class TestStream(TestBase): """Test stream classes""" - + data_sizes = (15, 10000, 1000*1024+512) - + def _assert_stream_reader(self, stream, cdata, rewind_stream=lambda s: None): - """Make stream tests - the orig_stream is seekable, allowing it to be + """Make stream tests - the orig_stream is seekable, allowing it to be rewound and reused :param cdata: the data we expect to read from stream, the contents :param rewind_stream: function called to rewind the stream to make it ready for reuse""" ns = 10 assert len(cdata) > ns-1, "Data must be larger than %i, was %i" % (ns, len(cdata)) - + # read in small steps - ss = len(cdata) / ns + ss = len(cdata) // ns for i in range(ns): data = stream.read(ss) chunk = cdata[i*ss:(i+1)*ss] @@ -55,38 +53,38 @@ class TestStream(TestBase): if rest: assert rest == cdata[-len(rest):] # END handle rest - + if isinstance(stream, DecompressMemMapReader): assert len(stream.data()) == stream.compressed_bytes_read() # END handle special type - + rewind_stream(stream) - + # read everything rdata = stream.read() assert rdata == cdata - + if isinstance(stream, DecompressMemMapReader): assert len(stream.data()) == stream.compressed_bytes_read() # END handle special type - + def test_decompress_reader(self): for close_on_deletion in range(2): for with_size in range(2): for ds in self.data_sizes: cdata = make_bytes(ds, randomize=False) - + # zdata = zipped actual data # cdata = original content data - + # create reader if with_size: # need object data zdata = zlib.compress(make_object(str_blob_type, cdata)) - type, size, reader = DecompressMemMapReader.new(zdata, close_on_deletion) + typ, size, reader = DecompressMemMapReader.new(zdata, close_on_deletion) assert size == len(cdata) - assert type == str_blob_type - + assert typ == str_blob_type + # even if we don't set the size, it will be set automatically on first read test_reader = DecompressMemMapReader(zdata, close_on_deletion=False) assert test_reader._s == len(cdata) @@ -95,60 +93,59 @@ class TestStream(TestBase): zdata = zlib.compress(cdata) reader = DecompressMemMapReader(zdata, close_on_deletion, len(cdata)) assert reader._s == len(cdata) - # END get reader - + # END get reader + self._assert_stream_reader(reader, cdata, lambda r: r.seek(0)) - + # put in a dummy stream for closing dummy = DummyStream() reader._m = dummy - + assert not dummy.closed del(reader) assert dummy.closed == close_on_deletion # END for each datasize # END whether size should be used # END whether stream should be closed when deleted - + def test_sha_writer(self): writer = Sha1Writer() - assert 2 == writer.write("hi") + assert 2 == writer.write("hi".encode("ascii")) assert len(writer.sha(as_hex=1)) == 40 assert len(writer.sha(as_hex=0)) == 20 - + # make sure it does something ;) prev_sha = writer.sha() - writer.write("hi again") + writer.write("hi again".encode("ascii")) assert writer.sha() != prev_sha - + def test_compressed_writer(self): for ds in self.data_sizes: fd, path = tempfile.mkstemp() ostream = FDCompressedSha1Writer(fd) data = make_bytes(ds, randomize=False) - + # for now, just a single write, code doesn't care about chunking assert len(data) == ostream.write(data) ostream.close() - + # its closed already self.failUnlessRaises(OSError, os.close, fd) - + # read everything back, compare to data we zip fd = os.open(path, os.O_RDONLY|getattr(os, 'O_BINARY', 0)) written_data = os.read(fd, os.path.getsize(path)) assert len(written_data) == os.path.getsize(path) os.close(fd) assert written_data == zlib.compress(data, 1) # best speed - + os.remove(path) # END for each os - + def test_decompress_reader_special_case(self): odb = LooseObjectDB(fixture_path('objects')) ostream = odb.stream(hex_to_bin('7bb839852ed5e3a069966281bb08d50012fb309b')) - + # if there is a bug, we will be missing one byte exactly ! data = ostream.read() assert len(data) == ostream.size - |
