summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-11-18 09:20:41 -0800
committerDana Powers <dana.powers@gmail.com>2016-11-18 09:20:41 -0800
commit92ed46eee41e6910c78357a6092173dfb2abb532 (patch)
treeb1fd9781ca80311e33a3ef9a96aa688a46f348ab
parent77591afa789a4752f4d385228bea980f448f6a08 (diff)
downloadkafka-python-coordinator_dead_error_logging.tar.gz
Always include an error for logging when the coordinator is marked deadcoordinator_dead_error_logging
-rw-r--r--kafka/coordinator/base.py14
-rw-r--r--kafka/coordinator/consumer.py4
2 files changed, 9 insertions, 9 deletions
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py
index 5f60aa3..22dffb4 100644
--- a/kafka/coordinator/base.py
+++ b/kafka/coordinator/base.py
@@ -190,7 +190,7 @@ class BaseCoordinator(object):
return True
if self._client.is_disconnected(self.coordinator_id):
- self.coordinator_dead()
+ self.coordinator_dead('Node Disconnected')
return True
return False
@@ -311,7 +311,7 @@ class BaseCoordinator(object):
# unless the error is caused by internal client pipelining
if not isinstance(error, (Errors.NodeNotReadyError,
Errors.TooManyInFlightRequests)):
- self.coordinator_dead()
+ self.coordinator_dead(error)
future.failure(error)
def _handle_join_group_response(self, future, send_time, response):
@@ -348,7 +348,7 @@ class BaseCoordinator(object):
elif error_type in (Errors.GroupCoordinatorNotAvailableError,
Errors.NotCoordinatorForGroupError):
# re-discover the coordinator and retry with backoff
- self.coordinator_dead()
+ self.coordinator_dead(error_type())
log.debug("Attempt to join group %s failed due to obsolete "
"coordinator information: %s", self.group_id,
error_type.__name__)
@@ -448,7 +448,7 @@ class BaseCoordinator(object):
Errors.NotCoordinatorForGroupError):
error = error_type()
log.debug("SyncGroup for group %s failed due to %s", self.group_id, error)
- self.coordinator_dead()
+ self.coordinator_dead(error)
future.failure(error)
else:
error = error_type()
@@ -513,7 +513,7 @@ class BaseCoordinator(object):
error)
future.failure(error)
- def coordinator_dead(self, error=None):
+ def coordinator_dead(self, error):
"""Mark the current coordinator as dead."""
if self.coordinator_id is not None:
log.warning("Marking the coordinator dead (node %s) for group %s: %s.",
@@ -571,7 +571,7 @@ class BaseCoordinator(object):
log.warning("Heartbeat failed for group %s: coordinator (node %s)"
" is either not started or not valid", self.group_id,
self.coordinator_id)
- self.coordinator_dead()
+ self.coordinator_dead(error_type())
future.failure(error_type())
elif error_type is Errors.RebalanceInProgressError:
log.warning("Heartbeat failed for group %s because it is"
@@ -640,7 +640,7 @@ class HeartbeatTask(object):
# we haven't received a successful heartbeat in one session interval
# so mark the coordinator dead
log.error("Heartbeat session expired - marking coordinator dead")
- self._coordinator.coordinator_dead()
+ self._coordinator.coordinator_dead('Heartbeat session expired')
return
if not self._heartbeat.should_heartbeat():
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py
index a600cb4..fac8144 100644
--- a/kafka/coordinator/consumer.py
+++ b/kafka/coordinator/consumer.py
@@ -525,7 +525,7 @@ class ConsumerCoordinator(BaseCoordinator):
Errors.RequestTimedOutError):
log.debug("OffsetCommit for group %s failed: %s",
self.group_id, error_type.__name__)
- self.coordinator_dead()
+ self.coordinator_dead(error_type())
future.failure(error_type(self.group_id))
return
elif error_type in (Errors.UnknownMemberIdError,
@@ -630,7 +630,7 @@ class ConsumerCoordinator(BaseCoordinator):
future.failure(error)
elif error_type is Errors.NotCoordinatorForGroupError:
# re-discover the coordinator and retry
- self.coordinator_dead()
+ self.coordinator_dead(error_type())
future.failure(error)
elif error_type in (Errors.UnknownMemberIdError,
Errors.IllegalGenerationError):