summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-03-11 16:47:46 -0800
committerDana Powers <dana.powers@gmail.com>2016-03-12 15:08:36 -0800
commit86d98c00fdda7f0d9f2cccb64e2128977bd5ee8d (patch)
treeffed3262250e91828b8ccc6a81791523c6a5a939 /kafka
parent0057e75ecfff12dcc16ec5b285c7288666798552 (diff)
downloadkafka-python-86d98c00fdda7f0d9f2cccb64e2128977bd5ee8d.tar.gz
Log successful heartbeat as INFO; improve heartbeat response logging
Diffstat (limited to 'kafka')
-rw-r--r--kafka/coordinator/base.py23
1 files changed, 12 insertions, 11 deletions
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py
index a2c47a4..dca809e 100644
--- a/kafka/coordinator/base.py
+++ b/kafka/coordinator/base.py
@@ -536,26 +536,27 @@ class BaseCoordinator(object):
#self.sensors.heartbeat_latency.record(response.requestLatencyMs())
error_type = Errors.for_code(response.error_code)
if error_type is Errors.NoError:
- log.debug("Received successful heartbeat response.")
+ log.info("Heartbeat successful")
future.success(None)
elif error_type in (Errors.GroupCoordinatorNotAvailableError,
Errors.NotCoordinatorForGroupError):
- log.info("Heartbeat failed: coordinator is either not started or"
- " not valid; will refresh metadata and retry")
+ log.warning("Heartbeat failed: coordinator is either not started or"
+ " not valid; will refresh metadata and retry")
self.coordinator_dead()
future.failure(error_type())
elif error_type is Errors.RebalanceInProgressError:
- log.info("Heartbeat failed: group is rebalancing; re-joining group")
+ log.warning("Heartbeat: group is rebalancing; this consumer needs to"
+ " re-join")
self.rejoin_needed = True
future.failure(error_type())
elif error_type is Errors.IllegalGenerationError:
- log.info("Heartbeat failed: local generation id is not current;"
- " re-joining group")
+ log.warning("Heartbeat: generation id is not current; this consumer"
+ " needs to re-join")
self.rejoin_needed = True
future.failure(error_type())
elif error_type is Errors.UnknownMemberIdError:
- log.info("Heartbeat failed: local member_id was not recognized;"
- " resetting and re-joining group")
+ log.warning("Heartbeat: local member_id was not recognized;"
+ " this consumer needs to re-join")
self.member_id = JoinGroupRequest.UNKNOWN_MEMBER_ID
self.rejoin_needed = True
future.failure(error_type)
@@ -597,8 +598,8 @@ class HeartbeatTask(object):
self._coordinator.need_rejoin()):
# no need to send the heartbeat we're not using auto-assignment
# or if we are awaiting a rebalance
- log.debug("Skipping heartbeat: no auto-assignment"
- " or waiting on rebalance")
+ log.info("Skipping heartbeat: no auto-assignment"
+ " or waiting on rebalance")
return
if self._coordinator.coordinator_unknown():
@@ -633,7 +634,7 @@ class HeartbeatTask(object):
self._client.schedule(self, time.time() + ttl)
def _handle_heartbeat_failure(self, e):
- log.debug("Heartbeat failed; retrying")
+ log.warning("Heartbeat failed; retrying")
self._request_in_flight = False
etd = time.time() + self._coordinator.config['retry_backoff_ms'] / 1000.0
self._client.schedule(self, etd)