diff options
author | Jenkins <jenkins@review.openstack.org> | 2014-08-26 16:13:26 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2014-08-26 16:13:26 +0000 |
commit | 0525da6acffce438b26013e4621423bc88bb0fb1 (patch) | |
tree | a0bf513620e7f484645dbd5efe1cbaace0fb09db | |
parent | 9d7a153c9815385c3fd651fc3024825c6677fd10 (diff) | |
parent | 7221c70469db19e620e1d0ae77c25311d8f5d04e (diff) | |
download | tooz-0525da6acffce438b26013e4621423bc88bb0fb1.tar.gz |
Merge "memcached: switch leader election implementation to a lock"
-rw-r--r-- | tooz/drivers/memcached.py | 35 |
1 files changed, 16 insertions, 19 deletions
diff --git a/tooz/drivers/memcached.py b/tooz/drivers/memcached.py index 026bd63..7f24c5c 100644 --- a/tooz/drivers/memcached.py +++ b/tooz/drivers/memcached.py @@ -71,6 +71,9 @@ class MemcachedLock(locking.Lock): self.coord.client.touch(self.name, expire=self.timeout) + def get_owner(self): + return self.coord.client.get(self.name) + class MemcachedDriver(coordination.CoordinationDriver): @@ -123,13 +126,12 @@ class MemcachedDriver(coordination.CoordinationDriver): self.heartbeat() def stop(self): + for lock in self._acquired_locks: + lock.release() + self.client.delete(self._encode_member_id(self._member_id)) map(self.leave_group, list(self._groups)) - for group_id in six.iterkeys(self._hooks_elected_leader): - if self.get_leader(group_id).get() == self._member_id: - self.client.delete(self._encode_group_leader(group_id)) - self.client.close() def _encode_group_id(self, group_id): @@ -255,7 +257,7 @@ class MemcachedDriver(coordination.CoordinationDriver): def get_leader(self, group_id): return MemcachedAsyncResult( - self.client.get(self._encode_group_leader(group_id))) + self._get_leader_lock(group_id).get_owner()) def heartbeat(self): self.client.set(self._encode_member_id(self._member_id), @@ -265,11 +267,6 @@ class MemcachedDriver(coordination.CoordinationDriver): for lock in self._acquired_locks: lock.heartbeat() - for group_id in six.iterkeys(self._hooks_elected_leader): - if self.get_leader(group_id).get() == self._member_id: - self.client.touch(self._encode_group_leader(group_id), - expire=self.leader_timeout) - def _init_watch_group(self, group_id): members = self.client.get(self._encode_group_id(group_id)) if members is None: @@ -307,6 +304,10 @@ class MemcachedDriver(coordination.CoordinationDriver): def get_lock(self, name): return MemcachedLock(self, name, self.lock_timeout) + def _get_leader_lock(self, group_id): + return MemcachedLock(self, self._encode_group_leader(group_id), + self.leader_timeout) + def run_watchers(self): result = [] for group_id in self.client.get(self._GROUP_LIST_KEY): @@ -328,18 +329,14 @@ class MemcachedDriver(coordination.CoordinationDriver): self._group_members[group_id] = group_members - for group_id in six.iterkeys(self._hooks_elected_leader): - lock_id = self._encode_group_leader(group_id) + for group_id, hooks in six.iteritems(self._hooks_elected_leader): # Try to grab the lock, if that fails, that means someone has it # already. - if self.client.add(lock_id, self._member_id, - expire=self.leader_timeout, - noreply=False): + if self._get_leader_lock(group_id).acquire(blocking=False): # We got the lock - self._hooks_elected_leader[group_id].run( - coordination.LeaderElected( - group_id, - self._member_id)) + hooks.run(coordination.LeaderElected( + group_id, + self._member_id)) return result |