diff options
author | Dana Powers <dana.powers@gmail.com> | 2017-03-06 10:33:12 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2017-03-09 10:43:37 -0800 |
commit | a970870a744a913a90a274436f04d46aead6dd36 (patch) | |
tree | 402a616dc1736eb9313f8ecc5884a7948529d1f8 | |
parent | 218a9014b749e52a2b8d40da6e3443c8132b8fa1 (diff) | |
download | kafka-python-clean_drain_coordinator_requests.tar.gz |
Avoid unknown coordinator after client pollclean_drain_coordinator_requests
-rw-r--r-- | kafka/coordinator/base.py | 13 |
1 files changed, 6 insertions, 7 deletions
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index e811e88..68b1bda 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -245,13 +245,12 @@ class BaseCoordinator(object): # ensure that there are no pending requests to the coordinator. # This is important in particular to avoid resending a pending # JoinGroup request. - if self._client.in_flight_request_count(self.coordinator_id): - while not self.coordinator_unknown(): - self._client.poll(delayed_tasks=False) - if not self._client.in_flight_request_count(self.coordinator_id): - break - else: - continue + while not self.coordinator_unknown(): + if not self._client.in_flight_request_count(self.coordinator_id): + break + self._client.poll(delayed_tasks=False) + else: + continue future = self._send_join_group_request() self._client.poll(future=future) |