diff options
author | Julien Danjou <julien@danjou.info> | 2014-04-01 17:28:38 +0200 |
---|---|---|
committer | Julien Danjou <julien@danjou.info> | 2014-04-07 13:54:34 +0200 |
commit | dda34fac81a9e92efb602e9a8ceb79e8d311dc00 (patch) | |
tree | dd155a9e45022efc528ae06e7906198361bada00 | |
parent | 3123c5a1fe72a9944364deb9b9ff41092238989b (diff) | |
download | tooz-dda34fac81a9e92efb602e9a8ceb79e8d311dc00.tar.gz |
coordination, zookeeper: implement leader election
Change-Id: Ic3d1934b87be37ba56f744abd045adb634bcdb08
-rw-r--r-- | tooz/coordination.py | 47 | ||||
-rw-r--r-- | tooz/drivers/memcached.py | 8 | ||||
-rw-r--r-- | tooz/drivers/zookeeper.py | 44 | ||||
-rw-r--r-- | tooz/tests/test_coordination.py | 88 |
4 files changed, 183 insertions, 4 deletions
diff --git a/tooz/coordination.py b/tooz/coordination.py index d2d76ea..75fb6c3 100644 --- a/tooz/coordination.py +++ b/tooz/coordination.py @@ -48,12 +48,21 @@ class MemberLeftGroup(Event): self.member_id = member_id +class LeaderElected(Event): + """A leader as been elected.""" + + def __init__(self, group_id, member_id): + self.group_id = group_id + self.member_id = member_id + + @six.add_metaclass(abc.ABCMeta) class CoordinationDriver(object): def __init__(self): self._hooks_join_group = collections.defaultdict(Hooks) self._hooks_leave_group = collections.defaultdict(Hooks) + self._hooks_elected_leader = collections.defaultdict(Hooks) # A cache for group members self._group_members = collections.defaultdict(set) @@ -118,6 +127,44 @@ class CoordinationDriver(object): and group_id in self._group_members): del self._group_members[group_id] + @abc.abstractmethod + def watch_elected_as_leader(self, group_id, callback): + """Call a function when member gets elected as leader. + + The callback functions will be executed when `run_watchers` is + called. + + :param group_id: The group id to watch + :param callback: The function to execute when a member leaves this + group + + """ + self._hooks_elected_leader[group_id].append(callback) + + @abc.abstractmethod + def unwatch_elected_as_leader(self, group_id, callback): + """Call a function when member gets elected as leader. + + The callback functions will be executed when `run_watchers` is + called. + + :param group_id: The group id to watch + :param callback: The function to execute when a member leaves this + group + + """ + self._hooks_elected_leader[group_id].remove(callback) + if not self._hooks.elected_leader[group_id]: + del self._hooks.elected_leader[group_id] + + @staticmethod + def stand_down_group_leader(group_id): + """Stand down as the group leader if we are. + + :param group_id: The group where we don't want to be a leader anymore + """ + raise NotImplementedError + def start(self, timeout): """Start the service engine. diff --git a/tooz/drivers/memcached.py b/tooz/drivers/memcached.py index 3be2e25..8535cd9 100644 --- a/tooz/drivers/memcached.py +++ b/tooz/drivers/memcached.py @@ -237,6 +237,14 @@ class MemcachedDriver(coordination.CoordinationDriver): return super(MemcachedDriver, self).unwatch_leave_group( group_id, callback) + @staticmethod + def watch_elected_as_leader(group_id, callback): + raise NotImplementedError + + @staticmethod + def unwatch_elected_as_leader(group_id, callback): + raise NotImplementedError + def run_watchers(self): result = [] for group_id in self.client.get(self._GROUP_LIST_KEY): diff --git a/tooz/drivers/zookeeper.py b/tooz/drivers/zookeeper.py index 6434f8e..9a7344b 100644 --- a/tooz/drivers/zookeeper.py +++ b/tooz/drivers/zookeeper.py @@ -42,7 +42,8 @@ class BaseZooKeeperDriver(coordination.CoordinationDriver): raise coordination.ToozError("operation error: %s" % (e)) self._group_members = collections.defaultdict(set) - self._children_changes = six.moves.queue.Queue() + self._watchers = six.moves.queue.Queue() + self._leader_locks = {} def stop(self): self._coord.stop() @@ -216,7 +217,7 @@ class KazooDriver(BaseZooKeeperDriver): # Copy function in case it's removed later from the # hook list hooks = copy.copy(self._hooks_join_group[group_id]) - self._children_changes.put( + self._watchers.put( lambda: hooks.run( coordination.MemberJoinedGroup( group_id, @@ -226,7 +227,7 @@ class KazooDriver(BaseZooKeeperDriver): # Copy function in case it's removed later from the # hook list hooks = copy.copy(self._hooks_leave_group[group_id]) - self._children_changes.put( + self._watchers.put( lambda: hooks.run( coordination.MemberLeftGroup( group_id, @@ -287,14 +288,41 @@ class KazooDriver(BaseZooKeeperDriver): return super(BaseZooKeeperDriver, self).unwatch_leave_group( group_id, callback) + def watch_elected_as_leader(self, group_id, callback): + return super(BaseZooKeeperDriver, self).watch_elected_as_leader( + group_id, callback) + + def unwatch_elected_as_leader(self, group_id, callback): + return super(BaseZooKeeperDriver, self).unwatch_elected_as_leader( + group_id, callback) + + def stand_down_group_leader(self, group_id): + if group_id in self._leader_locks: + self._leader_locks[group_id].release() + return True + return False + def run_watchers(self): ret = [] while True: try: - cb = self._children_changes.get(block=False) + cb = self._watchers.get(block=False) except six.moves.queue.Empty: break ret.extend(cb()) + + for group_id in six.iterkeys(self._hooks_elected_leader): + if group_id not in self._leader_locks: + self._leader_locks[group_id] = self._coord.Lock( + self._path_group(group_id)) + + if self._leader_locks[group_id].acquire(blocking=False): + # We are now leader for this group + self._hooks_elected_leader[group_id].run( + coordination.LeaderElected( + group_id, + self._member_id)) + return ret @@ -329,6 +357,14 @@ class ZakeDriver(BaseZooKeeperDriver): raise NotImplementedError @staticmethod + def watch_elected_as_leader(group_id, callback): + raise NotImplementedError + + @staticmethod + def unwatch_elected_as_leader(group_id, callback): + raise NotImplementedError + + @staticmethod def run_watchers(): raise NotImplementedError diff --git a/tooz/tests/test_coordination.py b/tooz/tests/test_coordination.py index 9eb8e83..718cc56 100644 --- a/tooz/tests/test_coordination.py +++ b/tooz/tests/test_coordination.py @@ -347,6 +347,94 @@ class TestAPI(testscenarios.TestWithScenarios, lambda: None) self.assertEqual(0, len(self._coord._hooks_leave_group[self.group_id])) + def test_run_for_election(self): + self._coord.create_group(self.group_id).get() + self._coord.watch_elected_as_leader(self.group_id, self._set_event) + self._coord.run_watchers() + + self.assertIsInstance(self.event, + tooz.coordination.LeaderElected) + self.assertEqual(self.member_id, + self.event.member_id) + self.assertEqual(self.group_id, + self.event.group_id) + + def test_run_for_election_multiple_clients(self): + self._coord.create_group(self.group_id).get() + self._coord.watch_elected_as_leader(self.group_id, self._set_event) + self._coord.run_watchers() + + member_id_test2 = self._get_random_uuid() + client2 = tooz.coordination.get_coordinator(self.backend, + member_id_test2, + **self.kwargs) + client2.start() + client2.watch_elected_as_leader(self.group_id, self._set_event) + client2.run_watchers() + + self.assertIsInstance(self.event, + tooz.coordination.LeaderElected) + self.assertEqual(self.member_id, + self.event.member_id) + self.assertEqual(self.group_id, + self.event.group_id) + + self.event = None + + self._coord.stop() + client2.run_watchers() + + self.assertIsInstance(self.event, + tooz.coordination.LeaderElected) + self.assertEqual(member_id_test2, + self.event.member_id) + self.assertEqual(self.group_id, + self.event.group_id) + + def test_run_for_election_multiple_clients_stand_down(self): + self._coord.create_group(self.group_id).get() + self._coord.watch_elected_as_leader(self.group_id, self._set_event) + self._coord.run_watchers() + + member_id_test2 = self._get_random_uuid() + client2 = tooz.coordination.get_coordinator(self.backend, + member_id_test2, + **self.kwargs) + client2.start() + client2.watch_elected_as_leader(self.group_id, self._set_event) + client2.run_watchers() + + self.assertIsInstance(self.event, + tooz.coordination.LeaderElected) + self.assertEqual(self.member_id, + self.event.member_id) + self.assertEqual(self.group_id, + self.event.group_id) + + self.event = None + + self._coord.stand_down_group_leader(self.group_id) + client2.run_watchers() + + self.assertIsInstance(self.event, + tooz.coordination.LeaderElected) + self.assertEqual(member_id_test2, + self.event.member_id) + self.assertEqual(self.group_id, + self.event.group_id) + + self.event = None + + client2.stand_down_group_leader(self.group_id) + self._coord.run_watchers() + + self.assertIsInstance(self.event, + tooz.coordination.LeaderElected) + self.assertEqual(self.member_id, + self.event.member_id) + self.assertEqual(self.group_id, + self.event.group_id) + @staticmethod def _get_random_uuid(): return str(uuid.uuid4()).encode('ascii') |