summaryrefslogtreecommitdiff
path: root/kazoo/tests/test_election.py
blob: 45ef0be7ab0076ed265e9dd82b560c8a80abb231 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import uuid
import sys
import threading

import pytest

from kazoo.testing import KazooTestCase
from kazoo.tests.util import wait


class UniqueError(Exception):
    """Error raised only by test leader function
    """


class KazooElectionTests(KazooTestCase):
    def setUp(self):
        super(KazooElectionTests, self).setUp()
        self.path = "/" + uuid.uuid4().hex

        self.condition = threading.Condition()

        # election contenders set these when elected. The exit event is set by
        # the test to make the leader exit.
        self.leader_id = None
        self.exit_event = None

        # tests set this before the event to make the leader raise an error
        self.raise_exception = False

        # set by a worker thread when an unexpected error is hit.
        # better way to do this?
        self.thread_exc_info = None

    def _spawn_contender(self, contender_id, election):
        thread = threading.Thread(target=self._election_thread,
                                  args=(contender_id, election))
        thread.daemon = True
        thread.start()
        return thread

    def _election_thread(self, contender_id, election):
        try:
            election.run(self._leader_func, contender_id)
        except UniqueError:
            if not self.raise_exception:
                self.thread_exc_info = sys.exc_info()
        except Exception:
            self.thread_exc_info = sys.exc_info()
        else:
            if self.raise_exception:
                e = Exception("expected leader func to raise exception")
                self.thread_exc_info = (Exception, e, None)

    def _leader_func(self, name):
        exit_event = threading.Event()
        with self.condition:
            self.exit_event = exit_event
            self.leader_id = name
            self.condition.notify_all()

        exit_event.wait(45)
        if self.raise_exception:
            raise UniqueError("expected error in the leader function")

    def _check_thread_error(self):
        if self.thread_exc_info:
            t, o, tb = self.thread_exc_info
            raise t(o)

    def test_election(self):
        elections = {}
        threads = {}
        for _ in range(3):
            contender = "c" + uuid.uuid4().hex
            elections[contender] = self.client.Election(self.path, contender)
            threads[contender] = self._spawn_contender(
                contender, elections[contender])

        # wait for a leader to be elected
        times = 0
        with self.condition:
            while not self.leader_id:
                self.condition.wait(5)
                times += 1
                if times > 5:
                    raise Exception("Still not a leader: lid: %s",
                                    self.leader_id)

        election = self.client.Election(self.path)

        # make sure all contenders are in the pool
        wait(lambda: len(election.contenders()) == len(elections))
        contenders = election.contenders()

        assert set(contenders) == set(elections.keys())

        # first one in list should be leader
        first_leader = contenders[0]
        assert first_leader == self.leader_id

        # tell second one to cancel election. should never get elected.
        elections[contenders[1]].cancel()

        # make leader exit. third contender should be elected.
        self.exit_event.set()
        with self.condition:
            while self.leader_id == first_leader:
                self.condition.wait(45)
        assert self.leader_id == contenders[2]
        self._check_thread_error()

        # make first contender re-enter the race
        threads[first_leader].join()
        threads[first_leader] = self._spawn_contender(
            first_leader, elections[first_leader]
        )

        # contender set should now be the current leader plus the first leader
        wait(lambda: len(election.contenders()) == 2)
        contenders = election.contenders()
        assert set(contenders), set([self.leader_id == first_leader])

        # make current leader raise an exception. first should be reelected
        self.raise_exception = True
        self.exit_event.set()
        with self.condition:
            while self.leader_id != first_leader:
                self.condition.wait(45)
        assert self.leader_id == first_leader
        self._check_thread_error()

        self.exit_event.set()
        for thread in threads.values():
            thread.join()
        self._check_thread_error()

    def test_bad_func(self):
        election = self.client.Election(self.path)
        with pytest.raises(ValueError):
            election.run("not a callable")