summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2014-08-26 16:13:26 +0000
committerGerrit Code Review <review@openstack.org>2014-08-26 16:13:26 +0000
commit0525da6acffce438b26013e4621423bc88bb0fb1 (patch)
treea0bf513620e7f484645dbd5efe1cbaace0fb09db
parent9d7a153c9815385c3fd651fc3024825c6677fd10 (diff)
parent7221c70469db19e620e1d0ae77c25311d8f5d04e (diff)
downloadtooz-0525da6acffce438b26013e4621423bc88bb0fb1.tar.gz
Merge "memcached: switch leader election implementation to a lock"
-rw-r--r--tooz/drivers/memcached.py35
1 files changed, 16 insertions, 19 deletions
diff --git a/tooz/drivers/memcached.py b/tooz/drivers/memcached.py
index 026bd63..7f24c5c 100644
--- a/tooz/drivers/memcached.py
+++ b/tooz/drivers/memcached.py
@@ -71,6 +71,9 @@ class MemcachedLock(locking.Lock):
self.coord.client.touch(self.name,
expire=self.timeout)
+ def get_owner(self):
+ return self.coord.client.get(self.name)
+
class MemcachedDriver(coordination.CoordinationDriver):
@@ -123,13 +126,12 @@ class MemcachedDriver(coordination.CoordinationDriver):
self.heartbeat()
def stop(self):
+ for lock in self._acquired_locks:
+ lock.release()
+
self.client.delete(self._encode_member_id(self._member_id))
map(self.leave_group, list(self._groups))
- for group_id in six.iterkeys(self._hooks_elected_leader):
- if self.get_leader(group_id).get() == self._member_id:
- self.client.delete(self._encode_group_leader(group_id))
-
self.client.close()
def _encode_group_id(self, group_id):
@@ -255,7 +257,7 @@ class MemcachedDriver(coordination.CoordinationDriver):
def get_leader(self, group_id):
return MemcachedAsyncResult(
- self.client.get(self._encode_group_leader(group_id)))
+ self._get_leader_lock(group_id).get_owner())
def heartbeat(self):
self.client.set(self._encode_member_id(self._member_id),
@@ -265,11 +267,6 @@ class MemcachedDriver(coordination.CoordinationDriver):
for lock in self._acquired_locks:
lock.heartbeat()
- for group_id in six.iterkeys(self._hooks_elected_leader):
- if self.get_leader(group_id).get() == self._member_id:
- self.client.touch(self._encode_group_leader(group_id),
- expire=self.leader_timeout)
-
def _init_watch_group(self, group_id):
members = self.client.get(self._encode_group_id(group_id))
if members is None:
@@ -307,6 +304,10 @@ class MemcachedDriver(coordination.CoordinationDriver):
def get_lock(self, name):
return MemcachedLock(self, name, self.lock_timeout)
+ def _get_leader_lock(self, group_id):
+ return MemcachedLock(self, self._encode_group_leader(group_id),
+ self.leader_timeout)
+
def run_watchers(self):
result = []
for group_id in self.client.get(self._GROUP_LIST_KEY):
@@ -328,18 +329,14 @@ class MemcachedDriver(coordination.CoordinationDriver):
self._group_members[group_id] = group_members
- for group_id in six.iterkeys(self._hooks_elected_leader):
- lock_id = self._encode_group_leader(group_id)
+ for group_id, hooks in six.iteritems(self._hooks_elected_leader):
# Try to grab the lock, if that fails, that means someone has it
# already.
- if self.client.add(lock_id, self._member_id,
- expire=self.leader_timeout,
- noreply=False):
+ if self._get_leader_lock(group_id).acquire(blocking=False):
# We got the lock
- self._hooks_elected_leader[group_id].run(
- coordination.LeaderElected(
- group_id,
- self._member_id))
+ hooks.run(coordination.LeaderElected(
+ group_id,
+ self._member_id))
return result