summaryrefslogtreecommitdiff
path: root/kazoo/tests/test_retry.py
blob: 3ce4a418125f3c9639c49548d0a84efea8d70fdb (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import unittest

from nose.tools import eq_


class TestRetrySleeper(unittest.TestCase):

    def _pass(self):
        pass

    def _fail(self, times=1):
        from kazoo.retry import ForceRetryError
        scope = dict(times=0)

        def inner():
            if scope['times'] >= times:
                pass
            else:
                scope['times'] += 1
                raise ForceRetryError('Failed!')
        return inner

    def _makeOne(self, *args, **kwargs):
        from kazoo.retry import KazooRetry
        return KazooRetry(*args, **kwargs)

    def test_reset(self):
        retry = self._makeOne(delay=0, max_tries=2)
        retry(self._fail())
        eq_(retry._attempts, 1)
        retry.reset()
        eq_(retry._attempts, 0)

    def test_too_many_tries(self):
        from kazoo.retry import RetryFailedError
        retry = self._makeOne(delay=0)
        self.assertRaises(RetryFailedError, retry, self._fail(times=999))
        eq_(retry._attempts, 1)

    def test_maximum_delay(self):
        def sleep_func(_time):
            pass

        retry = self._makeOne(delay=10, max_tries=100, sleep_func=sleep_func)
        retry(self._fail(times=10))
        self.assertTrue(retry._cur_delay < 4000, retry._cur_delay)
        # gevent's sleep function is picky about the type
        eq_(type(retry._cur_delay), float)

    def test_copy(self):
        def _sleep(t):
            return None

        retry = self._makeOne(sleep_func=_sleep)
        rcopy = retry.copy()
        self.assertTrue(rcopy.sleep_func is _sleep)


class TestKazooRetry(unittest.TestCase):

    def _makeOne(self, **kw):
        from kazoo.retry import KazooRetry
        return KazooRetry(**kw)

    def test_connection_closed(self):
        from kazoo.exceptions import ConnectionClosedError
        retry = self._makeOne()

        def testit():
            raise ConnectionClosedError()
        self.assertRaises(ConnectionClosedError, retry, testit)

    def test_session_expired(self):
        from kazoo.exceptions import SessionExpiredError
        retry = self._makeOne(max_tries=1)

        def testit():
            raise SessionExpiredError()
        self.assertRaises(Exception, retry, testit)