diff options
Diffstat (limited to 'gitdb/test/lib.py')
| -rw-r--r-- | gitdb/test/lib.py | 67 |
1 files changed, 38 insertions, 29 deletions
diff --git a/gitdb/test/lib.py b/gitdb/test/lib.py index ac8473a..d09b1cb 100644 --- a/gitdb/test/lib.py +++ b/gitdb/test/lib.py @@ -3,20 +3,14 @@ # This module is part of GitDB and is released under # the New BSD License: http://www.opensource.org/licenses/bsd-license.php """Utilities used in ODB testing""" -from gitdb import ( - OStream, - ) -from gitdb.stream import ( - Sha1Writer, - ZippedStoreShaWriter - ) - -from gitdb.util import zlib +from gitdb import OStream +from gitdb.utils.compat import xrange import sys import random from array import array -from cStringIO import StringIO + +from io import BytesIO import glob import unittest @@ -24,20 +18,36 @@ import tempfile import shutil import os import gc +from functools import wraps #{ Bases class TestBase(unittest.TestCase): """Base class for all tests""" - + #} END bases #{ Decorators +def skip_on_travis_ci(func): + """All tests decorated with this one will raise SkipTest when run on travis ci. + Use it to workaround difficult to solve issues + NOTE: copied from bcore (https://github.com/Byron/bcore)""" + @wraps(func) + def wrapper(self, *args, **kwargs): + if 'TRAVIS' in os.environ: + import nose + raise nose.SkipTest("Cannot run on travis-ci") + # end check for travis ci + return func(self, *args, **kwargs) + # end wrapper + return wrapper + + def with_rw_directory(func): - """Create a temporary directory which can be written to, remove it if the + """Create a temporary directory which can be written to, remove it if the test suceeds, but leave it otherwise to aid additional debugging""" def wrapper(self): path = tempfile.mktemp(prefix=func.__name__) @@ -47,12 +57,12 @@ def with_rw_directory(func): try: return func(self, path) except Exception: - print >> sys.stderr, "Test %s.%s failed, output is at %r" % (type(self).__name__, func.__name__, path) + sys.stderr.write("Test %s.%s failed, output is at %r\n" % (type(self).__name__, func.__name__, path)) keep = True raise finally: # Need to collect here to be sure all handles have been closed. It appears - # a windows-only issue. In fact things should be deleted, as well as + # a windows-only issue. In fact things should be deleted, as well as # memory maps closed, once objects go out of scope. For some reason # though this is not the case here unless we collect explicitly. if not keep: @@ -60,20 +70,20 @@ def with_rw_directory(func): shutil.rmtree(path) # END handle exception # END wrapper - + wrapper.__name__ = func.__name__ return wrapper def with_packs_rw(func): - """Function that provides a path into which the packs for testing should be + """Function that provides a path into which the packs for testing should be copied. Will pass on the path to the actual function afterwards""" def wrapper(self, path): src_pack_glob = fixture_path('packs/*') copy_files_globbed(src_pack_glob, path, hard_link_ok=True) return func(self, path) # END wrapper - + wrapper.__name__ = func.__name__ return wrapper @@ -86,10 +96,10 @@ def fixture_path(relapath=''): :param relapath: relative path into the fixtures directory, or '' to obtain the fixture directory itself""" return os.path.join(os.path.dirname(__file__), 'fixtures', relapath) - + def copy_files_globbed(source_glob, target_dir, hard_link_ok=False): """Copy all files found according to the given source glob into the target directory - :param hard_link_ok: if True, hard links will be created if possible. Otherwise + :param hard_link_ok: if True, hard links will be created if possible. Otherwise the files will be copied""" for src_file in glob.glob(source_glob): if hard_link_ok and hasattr(os, 'link'): @@ -103,12 +113,12 @@ def copy_files_globbed(source_glob, target_dir, hard_link_ok=False): shutil.copy(src_file, target_dir) # END try hard link # END for each file to copy - + def make_bytes(size_in_bytes, randomize=False): """:return: string with given size in bytes :param randomize: try to produce a very random stream""" - actual_size = size_in_bytes / 4 + actual_size = size_in_bytes // 4 producer = xrange(actual_size) if randomize: producer = list(producer) @@ -120,13 +130,13 @@ def make_bytes(size_in_bytes, randomize=False): def make_object(type, data): """:return: bytes resembling an uncompressed object""" odata = "blob %i\0" % len(data) - return odata + data - + return odata.encode("ascii") + data + def make_memory_file(size_in_bytes, randomize=False): """:return: tuple(size_of_stream, stream) :param randomize: try to produce a very random stream""" d = make_bytes(size_in_bytes, randomize) - return len(d), StringIO(d) + return len(d), BytesIO(d) #} END routines @@ -137,14 +147,14 @@ class DummyStream(object): self.was_read = False self.bytes = 0 self.closed = False - + def read(self, size): self.was_read = True self.bytes = size - + def close(self): self.closed = True - + def _assert(self): assert self.was_read @@ -153,10 +163,9 @@ class DeriveTest(OStream): def __init__(self, sha, type, size, stream, *args, **kwargs): self.myarg = kwargs.pop('myarg') self.args = args - + def _assert(self): assert self.args assert self.myarg #} END stream utilitiess - |
