diff options
author | Julien Danjou <julien@danjou.info> | 2014-08-21 17:09:56 +0200 |
---|---|---|
committer | Julien Danjou <julien@danjou.info> | 2014-08-21 17:10:30 +0200 |
commit | 7221c70469db19e620e1d0ae77c25311d8f5d04e (patch) | |
tree | 575aafd3fd09fcf018b7d6162a6563840b6e89ab | |
parent | fe7af7b3c10bc60133a3fb8a9eade6f25da5f061 (diff) | |
download | tooz-7221c70469db19e620e1d0ae77c25311d8f5d04e.tar.gz |
memcached: switch leader election implementation to a lock
We reuse our lock code to manage the leader election, a.k.a. eat our own
dog food.
This also fixes the issue that all lock were not released when stop()
was called.
Change-Id: Ibbec3e8ac29d55247ec3b1b8941d5a9569b67677
-rw-r--r-- | tooz/drivers/memcached.py | 35 |
1 files changed, 16 insertions, 19 deletions
diff --git a/tooz/drivers/memcached.py b/tooz/drivers/memcached.py index 026bd63..7f24c5c 100644 --- a/tooz/drivers/memcached.py +++ b/tooz/drivers/memcached.py @@ -71,6 +71,9 @@ class MemcachedLock(locking.Lock): self.coord.client.touch(self.name, expire=self.timeout) + def get_owner(self): + return self.coord.client.get(self.name) + class MemcachedDriver(coordination.CoordinationDriver): @@ -123,13 +126,12 @@ class MemcachedDriver(coordination.CoordinationDriver): self.heartbeat() def stop(self): + for lock in self._acquired_locks: + lock.release() + self.client.delete(self._encode_member_id(self._member_id)) map(self.leave_group, list(self._groups)) - for group_id in six.iterkeys(self._hooks_elected_leader): - if self.get_leader(group_id).get() == self._member_id: - self.client.delete(self._encode_group_leader(group_id)) - self.client.close() def _encode_group_id(self, group_id): @@ -255,7 +257,7 @@ class MemcachedDriver(coordination.CoordinationDriver): def get_leader(self, group_id): return MemcachedAsyncResult( - self.client.get(self._encode_group_leader(group_id))) + self._get_leader_lock(group_id).get_owner()) def heartbeat(self): self.client.set(self._encode_member_id(self._member_id), @@ -265,11 +267,6 @@ class MemcachedDriver(coordination.CoordinationDriver): for lock in self._acquired_locks: lock.heartbeat() - for group_id in six.iterkeys(self._hooks_elected_leader): - if self.get_leader(group_id).get() == self._member_id: - self.client.touch(self._encode_group_leader(group_id), - expire=self.leader_timeout) - def _init_watch_group(self, group_id): members = self.client.get(self._encode_group_id(group_id)) if members is None: @@ -307,6 +304,10 @@ class MemcachedDriver(coordination.CoordinationDriver): def get_lock(self, name): return MemcachedLock(self, name, self.lock_timeout) + def _get_leader_lock(self, group_id): + return MemcachedLock(self, self._encode_group_leader(group_id), + self.leader_timeout) + def run_watchers(self): result = [] for group_id in self.client.get(self._GROUP_LIST_KEY): @@ -328,18 +329,14 @@ class MemcachedDriver(coordination.CoordinationDriver): self._group_members[group_id] = group_members - for group_id in six.iterkeys(self._hooks_elected_leader): - lock_id = self._encode_group_leader(group_id) + for group_id, hooks in six.iteritems(self._hooks_elected_leader): # Try to grab the lock, if that fails, that means someone has it # already. - if self.client.add(lock_id, self._member_id, - expire=self.leader_timeout, - noreply=False): + if self._get_leader_lock(group_id).acquire(blocking=False): # We got the lock - self._hooks_elected_leader[group_id].run( - coordination.LeaderElected( - group_id, - self._member_id)) + hooks.run(coordination.LeaderElected( + group_id, + self._member_id)) return result |