summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJulien Danjou <julien@danjou.info>2014-08-21 17:09:56 +0200
committerJulien Danjou <julien@danjou.info>2014-08-21 17:10:30 +0200
commit7221c70469db19e620e1d0ae77c25311d8f5d04e (patch)
tree575aafd3fd09fcf018b7d6162a6563840b6e89ab
parentfe7af7b3c10bc60133a3fb8a9eade6f25da5f061 (diff)
downloadtooz-7221c70469db19e620e1d0ae77c25311d8f5d04e.tar.gz
memcached: switch leader election implementation to a lock
We reuse our lock code to manage the leader election, a.k.a. eat our own dog food. This also fixes the issue that all lock were not released when stop() was called. Change-Id: Ibbec3e8ac29d55247ec3b1b8941d5a9569b67677
-rw-r--r--tooz/drivers/memcached.py35
1 files changed, 16 insertions, 19 deletions
diff --git a/tooz/drivers/memcached.py b/tooz/drivers/memcached.py
index 026bd63..7f24c5c 100644
--- a/tooz/drivers/memcached.py
+++ b/tooz/drivers/memcached.py
@@ -71,6 +71,9 @@ class MemcachedLock(locking.Lock):
self.coord.client.touch(self.name,
expire=self.timeout)
+ def get_owner(self):
+ return self.coord.client.get(self.name)
+
class MemcachedDriver(coordination.CoordinationDriver):
@@ -123,13 +126,12 @@ class MemcachedDriver(coordination.CoordinationDriver):
self.heartbeat()
def stop(self):
+ for lock in self._acquired_locks:
+ lock.release()
+
self.client.delete(self._encode_member_id(self._member_id))
map(self.leave_group, list(self._groups))
- for group_id in six.iterkeys(self._hooks_elected_leader):
- if self.get_leader(group_id).get() == self._member_id:
- self.client.delete(self._encode_group_leader(group_id))
-
self.client.close()
def _encode_group_id(self, group_id):
@@ -255,7 +257,7 @@ class MemcachedDriver(coordination.CoordinationDriver):
def get_leader(self, group_id):
return MemcachedAsyncResult(
- self.client.get(self._encode_group_leader(group_id)))
+ self._get_leader_lock(group_id).get_owner())
def heartbeat(self):
self.client.set(self._encode_member_id(self._member_id),
@@ -265,11 +267,6 @@ class MemcachedDriver(coordination.CoordinationDriver):
for lock in self._acquired_locks:
lock.heartbeat()
- for group_id in six.iterkeys(self._hooks_elected_leader):
- if self.get_leader(group_id).get() == self._member_id:
- self.client.touch(self._encode_group_leader(group_id),
- expire=self.leader_timeout)
-
def _init_watch_group(self, group_id):
members = self.client.get(self._encode_group_id(group_id))
if members is None:
@@ -307,6 +304,10 @@ class MemcachedDriver(coordination.CoordinationDriver):
def get_lock(self, name):
return MemcachedLock(self, name, self.lock_timeout)
+ def _get_leader_lock(self, group_id):
+ return MemcachedLock(self, self._encode_group_leader(group_id),
+ self.leader_timeout)
+
def run_watchers(self):
result = []
for group_id in self.client.get(self._GROUP_LIST_KEY):
@@ -328,18 +329,14 @@ class MemcachedDriver(coordination.CoordinationDriver):
self._group_members[group_id] = group_members
- for group_id in six.iterkeys(self._hooks_elected_leader):
- lock_id = self._encode_group_leader(group_id)
+ for group_id, hooks in six.iteritems(self._hooks_elected_leader):
# Try to grab the lock, if that fails, that means someone has it
# already.
- if self.client.add(lock_id, self._member_id,
- expire=self.leader_timeout,
- noreply=False):
+ if self._get_leader_lock(group_id).acquire(blocking=False):
# We got the lock
- self._hooks_elected_leader[group_id].run(
- coordination.LeaderElected(
- group_id,
- self._member_id))
+ hooks.run(coordination.LeaderElected(
+ group_id,
+ self._member_id))
return result