summaryrefslogtreecommitdiff
path: root/kazoo/recipe/partitioner.py
diff options
context:
space:
mode:
Diffstat (limited to 'kazoo/recipe/partitioner.py')
-rw-r--r--kazoo/recipe/partitioner.py48
1 files changed, 32 insertions, 16 deletions
diff --git a/kazoo/recipe/partitioner.py b/kazoo/recipe/partitioner.py
index c552ea2..21dc6ef 100644
--- a/kazoo/recipe/partitioner.py
+++ b/kazoo/recipe/partitioner.py
@@ -56,6 +56,7 @@ class PartitionState(object):
be recreated.
"""
+
ALLOCATING = "ALLOCATING"
ACQUIRED = "ACQUIRED"
RELEASE = "RELEASE"
@@ -135,9 +136,18 @@ class SetPartitioner(object):
The current partition was released and is being re-allocated.
"""
- def __init__(self, client, path, set, partition_func=None,
- identifier=None, time_boundary=30, max_reaction_time=1,
- state_change_event=None):
+
+ def __init__(
+ self,
+ client,
+ path,
+ set,
+ partition_func=None,
+ identifier=None,
+ time_boundary=30,
+ max_reaction_time=1,
+ state_change_event=None,
+ ):
"""Create a :class:`~SetPartitioner` instance
:param client: A :class:`~kazoo.client.KazooClient` instance.
@@ -159,19 +169,22 @@ class SetPartitioner(object):
# Used to differentiate two states with the same names in time
self.state_id = 0
self.state = PartitionState.ALLOCATING
- self.state_change_event = state_change_event or \
- client.handler.event_object()
+ self.state_change_event = (
+ state_change_event or client.handler.event_object()
+ )
self._client = client
self._path = path
self._set = set
self._partition_set = []
self._partition_func = partition_func or self._partitioner
- self._identifier = identifier or '%s-%s' % (
- socket.getfqdn(), os.getpid())
+ self._identifier = identifier or "%s-%s" % (
+ socket.getfqdn(),
+ os.getpid(),
+ )
self._locks = []
- self._lock_path = '/'.join([path, 'locks'])
- self._party_path = '/'.join([path, 'party'])
+ self._lock_path = "/".join([path, "locks"])
+ self._party_path = "/".join([path, "party"])
self._time_boundary = time_boundary
self._max_reaction_time = max_reaction_time
@@ -183,8 +196,9 @@ class SetPartitioner(object):
client.ensure_path(self._party_path)
# Join the party
- self._party = client.ShallowParty(self._party_path,
- identifier=self._identifier)
+ self._party = client.ShallowParty(
+ self._party_path, identifier=self._identifier
+ )
self._party.join()
self._state_change = client.handler.rlock_object()
@@ -313,11 +327,12 @@ class SetPartitioner(object):
# Split up the set
partition_set = self._partition_func(
- self._identifier, list(self._party), self._set)
+ self._identifier, list(self._party), self._set
+ )
# Proceed to acquire locks for the working set as needed
for member in partition_set:
- lock = self._client.Lock(self._lock_path + '/' + str(member))
+ lock = self._client.Lock(self._lock_path + "/" + str(member))
while True:
try:
@@ -386,8 +401,9 @@ class SetPartitioner(object):
:param client_handler: If True, deliver the result using the
client's event handler.
"""
- watcher = PatientChildrenWatch(self._client, self._party_path,
- self._time_boundary)
+ watcher = PatientChildrenWatch(
+ self._client, self._party_path, self._time_boundary
+ )
asy = watcher.start()
if func is not None:
# We spin up the function in a separate thread/greenlet
@@ -419,7 +435,7 @@ class SetPartitioner(object):
i = workers.index(identifier)
# Now return the partition list starting at our location and
# skipping the other workers
- return all_partitions[i::len(workers)]
+ return all_partitions[i :: len(workers)]
def _set_state(self, state):
self.state = state