summaryrefslogtreecommitdiff
path: root/gitdb/test/test_util.py
blob: 3b3165d140ea512cbbd02d7fd767a59f359b1cd5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# Copyright (C) 2010, 2011 Sebastian Thiel (byronimo@gmail.com) and contributors
#
# This module is part of GitDB and is released under
# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
"""Test for object db"""
import tempfile
import os

from gitdb.test.lib import TestBase
from gitdb.util import (
    to_hex_sha,
    to_bin_sha,
    NULL_HEX_SHA,
    LockedFD
)


class TestUtils(TestBase):

    def test_basics(self):
        assert to_hex_sha(NULL_HEX_SHA) == NULL_HEX_SHA
        assert len(to_bin_sha(NULL_HEX_SHA)) == 20
        assert to_hex_sha(to_bin_sha(NULL_HEX_SHA)) == NULL_HEX_SHA.encode("ascii")

    def _cmp_contents(self, file_path, data):
        # raise if data from file at file_path
        # does not match data string
        with open(file_path, "rb") as fp:
            assert fp.read() == data.encode("ascii")

    def test_lockedfd(self):
        my_file = tempfile.mktemp()
        orig_data = "hello"
        new_data = "world"
        with open(my_file, "wb") as my_file_fp:
            my_file_fp.write(orig_data.encode("ascii"))

        try:
            lfd = LockedFD(my_file)
            lockfilepath = lfd._lockfilepath()

            # cannot end before it was started
            self.assertRaises(AssertionError, lfd.rollback)
            self.assertRaises(AssertionError, lfd.commit)

            # open for writing
            assert not os.path.isfile(lockfilepath)
            wfd = lfd.open(write=True)
            assert lfd._fd is wfd
            assert os.path.isfile(lockfilepath)

            # write data and fail
            os.write(wfd, new_data.encode("ascii"))
            lfd.rollback()
            assert lfd._fd is None
            self._cmp_contents(my_file, orig_data)
            assert not os.path.isfile(lockfilepath)

            # additional call doesn't fail
            lfd.commit()
            lfd.rollback()

            # test reading
            lfd = LockedFD(my_file)
            rfd = lfd.open(write=False)
            assert os.read(rfd, len(orig_data)) == orig_data.encode("ascii")

            assert os.path.isfile(lockfilepath)
            # deletion rolls back
            del(lfd)
            assert not os.path.isfile(lockfilepath)

            # write data - concurrently
            lfd = LockedFD(my_file)
            olfd = LockedFD(my_file)
            assert not os.path.isfile(lockfilepath)
            wfdstream = lfd.open(write=True, stream=True)       # this time as stream
            assert os.path.isfile(lockfilepath)
            # another one fails
            self.assertRaises(IOError, olfd.open)

            wfdstream.write(new_data.encode("ascii"))
            lfd.commit()
            assert not os.path.isfile(lockfilepath)
            self._cmp_contents(my_file, new_data)

            # could test automatic _end_writing on destruction
        finally:
            os.remove(my_file)
        # END final cleanup

        # try non-existing file for reading
        lfd = LockedFD(tempfile.mktemp())
        try:
            lfd.open(write=False)
        except OSError:
            assert not os.path.exists(lfd._lockfilepath())
        else:
            self.fail("expected OSError")
        # END handle exceptions