summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJulien Danjou <julien@danjou.info>2014-04-01 17:28:38 +0200
committerJulien Danjou <julien@danjou.info>2014-04-07 13:54:34 +0200
commitdda34fac81a9e92efb602e9a8ceb79e8d311dc00 (patch)
treedd155a9e45022efc528ae06e7906198361bada00
parent3123c5a1fe72a9944364deb9b9ff41092238989b (diff)
downloadtooz-dda34fac81a9e92efb602e9a8ceb79e8d311dc00.tar.gz
coordination, zookeeper: implement leader election
Change-Id: Ic3d1934b87be37ba56f744abd045adb634bcdb08
-rw-r--r--tooz/coordination.py47
-rw-r--r--tooz/drivers/memcached.py8
-rw-r--r--tooz/drivers/zookeeper.py44
-rw-r--r--tooz/tests/test_coordination.py88
4 files changed, 183 insertions, 4 deletions
diff --git a/tooz/coordination.py b/tooz/coordination.py
index d2d76ea..75fb6c3 100644
--- a/tooz/coordination.py
+++ b/tooz/coordination.py
@@ -48,12 +48,21 @@ class MemberLeftGroup(Event):
self.member_id = member_id
+class LeaderElected(Event):
+ """A leader as been elected."""
+
+ def __init__(self, group_id, member_id):
+ self.group_id = group_id
+ self.member_id = member_id
+
+
@six.add_metaclass(abc.ABCMeta)
class CoordinationDriver(object):
def __init__(self):
self._hooks_join_group = collections.defaultdict(Hooks)
self._hooks_leave_group = collections.defaultdict(Hooks)
+ self._hooks_elected_leader = collections.defaultdict(Hooks)
# A cache for group members
self._group_members = collections.defaultdict(set)
@@ -118,6 +127,44 @@ class CoordinationDriver(object):
and group_id in self._group_members):
del self._group_members[group_id]
+ @abc.abstractmethod
+ def watch_elected_as_leader(self, group_id, callback):
+ """Call a function when member gets elected as leader.
+
+ The callback functions will be executed when `run_watchers` is
+ called.
+
+ :param group_id: The group id to watch
+ :param callback: The function to execute when a member leaves this
+ group
+
+ """
+ self._hooks_elected_leader[group_id].append(callback)
+
+ @abc.abstractmethod
+ def unwatch_elected_as_leader(self, group_id, callback):
+ """Call a function when member gets elected as leader.
+
+ The callback functions will be executed when `run_watchers` is
+ called.
+
+ :param group_id: The group id to watch
+ :param callback: The function to execute when a member leaves this
+ group
+
+ """
+ self._hooks_elected_leader[group_id].remove(callback)
+ if not self._hooks.elected_leader[group_id]:
+ del self._hooks.elected_leader[group_id]
+
+ @staticmethod
+ def stand_down_group_leader(group_id):
+ """Stand down as the group leader if we are.
+
+ :param group_id: The group where we don't want to be a leader anymore
+ """
+ raise NotImplementedError
+
def start(self, timeout):
"""Start the service engine.
diff --git a/tooz/drivers/memcached.py b/tooz/drivers/memcached.py
index 3be2e25..8535cd9 100644
--- a/tooz/drivers/memcached.py
+++ b/tooz/drivers/memcached.py
@@ -237,6 +237,14 @@ class MemcachedDriver(coordination.CoordinationDriver):
return super(MemcachedDriver, self).unwatch_leave_group(
group_id, callback)
+ @staticmethod
+ def watch_elected_as_leader(group_id, callback):
+ raise NotImplementedError
+
+ @staticmethod
+ def unwatch_elected_as_leader(group_id, callback):
+ raise NotImplementedError
+
def run_watchers(self):
result = []
for group_id in self.client.get(self._GROUP_LIST_KEY):
diff --git a/tooz/drivers/zookeeper.py b/tooz/drivers/zookeeper.py
index 6434f8e..9a7344b 100644
--- a/tooz/drivers/zookeeper.py
+++ b/tooz/drivers/zookeeper.py
@@ -42,7 +42,8 @@ class BaseZooKeeperDriver(coordination.CoordinationDriver):
raise coordination.ToozError("operation error: %s" % (e))
self._group_members = collections.defaultdict(set)
- self._children_changes = six.moves.queue.Queue()
+ self._watchers = six.moves.queue.Queue()
+ self._leader_locks = {}
def stop(self):
self._coord.stop()
@@ -216,7 +217,7 @@ class KazooDriver(BaseZooKeeperDriver):
# Copy function in case it's removed later from the
# hook list
hooks = copy.copy(self._hooks_join_group[group_id])
- self._children_changes.put(
+ self._watchers.put(
lambda: hooks.run(
coordination.MemberJoinedGroup(
group_id,
@@ -226,7 +227,7 @@ class KazooDriver(BaseZooKeeperDriver):
# Copy function in case it's removed later from the
# hook list
hooks = copy.copy(self._hooks_leave_group[group_id])
- self._children_changes.put(
+ self._watchers.put(
lambda: hooks.run(
coordination.MemberLeftGroup(
group_id,
@@ -287,14 +288,41 @@ class KazooDriver(BaseZooKeeperDriver):
return super(BaseZooKeeperDriver, self).unwatch_leave_group(
group_id, callback)
+ def watch_elected_as_leader(self, group_id, callback):
+ return super(BaseZooKeeperDriver, self).watch_elected_as_leader(
+ group_id, callback)
+
+ def unwatch_elected_as_leader(self, group_id, callback):
+ return super(BaseZooKeeperDriver, self).unwatch_elected_as_leader(
+ group_id, callback)
+
+ def stand_down_group_leader(self, group_id):
+ if group_id in self._leader_locks:
+ self._leader_locks[group_id].release()
+ return True
+ return False
+
def run_watchers(self):
ret = []
while True:
try:
- cb = self._children_changes.get(block=False)
+ cb = self._watchers.get(block=False)
except six.moves.queue.Empty:
break
ret.extend(cb())
+
+ for group_id in six.iterkeys(self._hooks_elected_leader):
+ if group_id not in self._leader_locks:
+ self._leader_locks[group_id] = self._coord.Lock(
+ self._path_group(group_id))
+
+ if self._leader_locks[group_id].acquire(blocking=False):
+ # We are now leader for this group
+ self._hooks_elected_leader[group_id].run(
+ coordination.LeaderElected(
+ group_id,
+ self._member_id))
+
return ret
@@ -329,6 +357,14 @@ class ZakeDriver(BaseZooKeeperDriver):
raise NotImplementedError
@staticmethod
+ def watch_elected_as_leader(group_id, callback):
+ raise NotImplementedError
+
+ @staticmethod
+ def unwatch_elected_as_leader(group_id, callback):
+ raise NotImplementedError
+
+ @staticmethod
def run_watchers():
raise NotImplementedError
diff --git a/tooz/tests/test_coordination.py b/tooz/tests/test_coordination.py
index 9eb8e83..718cc56 100644
--- a/tooz/tests/test_coordination.py
+++ b/tooz/tests/test_coordination.py
@@ -347,6 +347,94 @@ class TestAPI(testscenarios.TestWithScenarios,
lambda: None)
self.assertEqual(0, len(self._coord._hooks_leave_group[self.group_id]))
+ def test_run_for_election(self):
+ self._coord.create_group(self.group_id).get()
+ self._coord.watch_elected_as_leader(self.group_id, self._set_event)
+ self._coord.run_watchers()
+
+ self.assertIsInstance(self.event,
+ tooz.coordination.LeaderElected)
+ self.assertEqual(self.member_id,
+ self.event.member_id)
+ self.assertEqual(self.group_id,
+ self.event.group_id)
+
+ def test_run_for_election_multiple_clients(self):
+ self._coord.create_group(self.group_id).get()
+ self._coord.watch_elected_as_leader(self.group_id, self._set_event)
+ self._coord.run_watchers()
+
+ member_id_test2 = self._get_random_uuid()
+ client2 = tooz.coordination.get_coordinator(self.backend,
+ member_id_test2,
+ **self.kwargs)
+ client2.start()
+ client2.watch_elected_as_leader(self.group_id, self._set_event)
+ client2.run_watchers()
+
+ self.assertIsInstance(self.event,
+ tooz.coordination.LeaderElected)
+ self.assertEqual(self.member_id,
+ self.event.member_id)
+ self.assertEqual(self.group_id,
+ self.event.group_id)
+
+ self.event = None
+
+ self._coord.stop()
+ client2.run_watchers()
+
+ self.assertIsInstance(self.event,
+ tooz.coordination.LeaderElected)
+ self.assertEqual(member_id_test2,
+ self.event.member_id)
+ self.assertEqual(self.group_id,
+ self.event.group_id)
+
+ def test_run_for_election_multiple_clients_stand_down(self):
+ self._coord.create_group(self.group_id).get()
+ self._coord.watch_elected_as_leader(self.group_id, self._set_event)
+ self._coord.run_watchers()
+
+ member_id_test2 = self._get_random_uuid()
+ client2 = tooz.coordination.get_coordinator(self.backend,
+ member_id_test2,
+ **self.kwargs)
+ client2.start()
+ client2.watch_elected_as_leader(self.group_id, self._set_event)
+ client2.run_watchers()
+
+ self.assertIsInstance(self.event,
+ tooz.coordination.LeaderElected)
+ self.assertEqual(self.member_id,
+ self.event.member_id)
+ self.assertEqual(self.group_id,
+ self.event.group_id)
+
+ self.event = None
+
+ self._coord.stand_down_group_leader(self.group_id)
+ client2.run_watchers()
+
+ self.assertIsInstance(self.event,
+ tooz.coordination.LeaderElected)
+ self.assertEqual(member_id_test2,
+ self.event.member_id)
+ self.assertEqual(self.group_id,
+ self.event.group_id)
+
+ self.event = None
+
+ client2.stand_down_group_leader(self.group_id)
+ self._coord.run_watchers()
+
+ self.assertIsInstance(self.event,
+ tooz.coordination.LeaderElected)
+ self.assertEqual(self.member_id,
+ self.event.member_id)
+ self.assertEqual(self.group_id,
+ self.event.group_id)
+
@staticmethod
def _get_random_uuid():
return str(uuid.uuid4()).encode('ascii')