diff options
Diffstat (limited to 'kazoo/recipe/partitioner.py')
-rw-r--r-- | kazoo/recipe/partitioner.py | 48 |
1 files changed, 32 insertions, 16 deletions
diff --git a/kazoo/recipe/partitioner.py b/kazoo/recipe/partitioner.py index c552ea2..21dc6ef 100644 --- a/kazoo/recipe/partitioner.py +++ b/kazoo/recipe/partitioner.py @@ -56,6 +56,7 @@ class PartitionState(object): be recreated. """ + ALLOCATING = "ALLOCATING" ACQUIRED = "ACQUIRED" RELEASE = "RELEASE" @@ -135,9 +136,18 @@ class SetPartitioner(object): The current partition was released and is being re-allocated. """ - def __init__(self, client, path, set, partition_func=None, - identifier=None, time_boundary=30, max_reaction_time=1, - state_change_event=None): + + def __init__( + self, + client, + path, + set, + partition_func=None, + identifier=None, + time_boundary=30, + max_reaction_time=1, + state_change_event=None, + ): """Create a :class:`~SetPartitioner` instance :param client: A :class:`~kazoo.client.KazooClient` instance. @@ -159,19 +169,22 @@ class SetPartitioner(object): # Used to differentiate two states with the same names in time self.state_id = 0 self.state = PartitionState.ALLOCATING - self.state_change_event = state_change_event or \ - client.handler.event_object() + self.state_change_event = ( + state_change_event or client.handler.event_object() + ) self._client = client self._path = path self._set = set self._partition_set = [] self._partition_func = partition_func or self._partitioner - self._identifier = identifier or '%s-%s' % ( - socket.getfqdn(), os.getpid()) + self._identifier = identifier or "%s-%s" % ( + socket.getfqdn(), + os.getpid(), + ) self._locks = [] - self._lock_path = '/'.join([path, 'locks']) - self._party_path = '/'.join([path, 'party']) + self._lock_path = "/".join([path, "locks"]) + self._party_path = "/".join([path, "party"]) self._time_boundary = time_boundary self._max_reaction_time = max_reaction_time @@ -183,8 +196,9 @@ class SetPartitioner(object): client.ensure_path(self._party_path) # Join the party - self._party = client.ShallowParty(self._party_path, - identifier=self._identifier) + self._party = client.ShallowParty( + self._party_path, identifier=self._identifier + ) self._party.join() self._state_change = client.handler.rlock_object() @@ -313,11 +327,12 @@ class SetPartitioner(object): # Split up the set partition_set = self._partition_func( - self._identifier, list(self._party), self._set) + self._identifier, list(self._party), self._set + ) # Proceed to acquire locks for the working set as needed for member in partition_set: - lock = self._client.Lock(self._lock_path + '/' + str(member)) + lock = self._client.Lock(self._lock_path + "/" + str(member)) while True: try: @@ -386,8 +401,9 @@ class SetPartitioner(object): :param client_handler: If True, deliver the result using the client's event handler. """ - watcher = PatientChildrenWatch(self._client, self._party_path, - self._time_boundary) + watcher = PatientChildrenWatch( + self._client, self._party_path, self._time_boundary + ) asy = watcher.start() if func is not None: # We spin up the function in a separate thread/greenlet @@ -419,7 +435,7 @@ class SetPartitioner(object): i = workers.index(identifier) # Now return the partition list starting at our location and # skipping the other workers - return all_partitions[i::len(workers)] + return all_partitions[i :: len(workers)] def _set_state(self, state): self.state = state |