summaryrefslogtreecommitdiff
path: root/kafka/coordinator
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-04-05 00:07:15 -0700
committerDana Powers <dana.powers@gmail.com>2016-04-05 10:22:54 -0700
commit145ac227cb7f471467de52c5016ed3727e417911 (patch)
tree799ba40ae175f0bc6e5c5013991805eb6dab5fd1 /kafka/coordinator
parent5a14bd8c947251d1a8f848175cc3cf2b07af3411 (diff)
downloadkafka-python-145ac227cb7f471467de52c5016ed3727e417911.tar.gz
KAFKA-3318: clean up consumer logging and error messages
Diffstat (limited to 'kafka/coordinator')
-rw-r--r--kafka/coordinator/base.py78
-rw-r--r--kafka/coordinator/consumer.py103
2 files changed, 103 insertions, 78 deletions
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py
index fcf3901..3c7ea21 100644
--- a/kafka/coordinator/base.py
+++ b/kafka/coordinator/base.py
@@ -200,7 +200,7 @@ class BaseCoordinator(object):
self._client.poll()
continue
- future = self._send_group_metadata_request()
+ future = self._send_group_coordinator_request()
self._client.poll(future=future)
if future.failed():
@@ -233,7 +233,7 @@ class BaseCoordinator(object):
while self.need_rejoin():
self.ensure_coordinator_known()
- future = self._perform_group_join()
+ future = self._send_join_group_request()
self._client.poll(future=future)
if future.succeeded():
@@ -253,7 +253,7 @@ class BaseCoordinator(object):
raise exception # pylint: disable-msg=raising-bad-type
time.sleep(self.config['retry_backoff_ms'] / 1000.0)
- def _perform_group_join(self):
+ def _send_join_group_request(self):
"""Join the group and return the assignment for the next generation.
This function handles both JoinGroup and SyncGroup, delegating to
@@ -268,7 +268,7 @@ class BaseCoordinator(object):
return Future().failure(e)
# send a join group request to the coordinator
- log.debug("(Re-)joining group %s", self.group_id)
+ log.info("(Re-)joining group %s", self.group_id)
request = JoinGroupRequest(
self.group_id,
self.config['session_timeout_ms'],
@@ -279,7 +279,7 @@ class BaseCoordinator(object):
for protocol, metadata in self.group_protocols()])
# create the request for the coordinator
- log.debug("Issuing request (%s) to coordinator %s", request, self.coordinator_id)
+ log.debug("Sending JoinGroup (%s) to coordinator %s", request, self.coordinator_id)
future = Future()
_f = self._client.send(self.coordinator_id, request)
_f.add_callback(self._handle_join_group_response, future)
@@ -300,6 +300,8 @@ class BaseCoordinator(object):
def _handle_join_group_response(self, future, response):
error_type = Errors.for_code(response.error_code)
if error_type is Errors.NoError:
+ log.debug("Received successful JoinGroup response for group %s: %s",
+ self.group_id, response)
self.member_id = response.member_id
self.generation = response.generation_id
self.rejoin_needed = False
@@ -315,30 +317,31 @@ class BaseCoordinator(object):
self._on_join_follower().chain(future)
elif error_type is Errors.GroupLoadInProgressError:
- log.debug("Attempt to join group %s rejected since coordinator is"
- " loading the group.", self.group_id)
+ log.debug("Attempt to join group %s rejected since coordinator %s"
+ " is loading the group.", self.group_id, self.coordinator_id)
# backoff and retry
future.failure(error_type(response))
elif error_type is Errors.UnknownMemberIdError:
# reset the member id and retry immediately
error = error_type(self.member_id)
self.member_id = JoinGroupRequest.UNKNOWN_MEMBER_ID
- log.info("Attempt to join group %s failed due to unknown member id,"
- " resetting and retrying.", self.group_id)
+ log.debug("Attempt to join group %s failed due to unknown member id",
+ self.group_id)
future.failure(error)
elif error_type in (Errors.GroupCoordinatorNotAvailableError,
Errors.NotCoordinatorForGroupError):
# re-discover the coordinator and retry with backoff
self.coordinator_dead()
- log.info("Attempt to join group %s failed due to obsolete "
- "coordinator information, retrying.", self.group_id)
+ log.debug("Attempt to join group %s failed due to obsolete "
+ "coordinator information: %s", self.group_id,
+ error_type.__name__)
future.failure(error_type())
elif error_type in (Errors.InconsistentGroupProtocolError,
Errors.InvalidSessionTimeoutError,
Errors.InvalidGroupIdError):
# log the error and re-throw the exception
error = error_type(response)
- log.error("Attempt to join group %s failed due to: %s",
+ log.error("Attempt to join group %s failed due to fatal error: %s",
self.group_id, error)
future.failure(error)
elif error_type is Errors.GroupAuthorizationFailedError:
@@ -356,8 +359,8 @@ class BaseCoordinator(object):
self.generation,
self.member_id,
{})
- log.debug("Issuing follower SyncGroup (%s) to coordinator %s",
- request, self.coordinator_id)
+ log.debug("Sending follower SyncGroup for group %s to coordinator %s: %s",
+ self.group_id, self.coordinator_id, request)
return self._send_sync_group_request(request)
def _on_join_leader(self, response):
@@ -386,8 +389,8 @@ class BaseCoordinator(object):
assignment if isinstance(assignment, bytes) else assignment.encode())
for member_id, assignment in six.iteritems(group_assignment)])
- log.debug("Issuing leader SyncGroup (%s) to coordinator %s",
- request, self.coordinator_id)
+ log.debug("Sending leader SyncGroup for group %s to coordinator %s: %s",
+ self.group_id, self.coordinator_id, request)
return self._send_sync_group_request(request)
def _send_sync_group_request(self, request):
@@ -404,8 +407,8 @@ class BaseCoordinator(object):
def _handle_sync_group_response(self, future, response):
error_type = Errors.for_code(response.error_code)
if error_type is Errors.NoError:
- log.debug("Received successful sync group response for group %s: %s",
- self.group_id, response)
+ log.info("Successfully joined group %s with generation %s",
+ self.group_id, self.generation)
#self.sensors.syncLatency.record(response.requestLatencyMs())
future.success(response.member_assignment)
return
@@ -415,21 +418,19 @@ class BaseCoordinator(object):
if error_type is Errors.GroupAuthorizationFailedError:
future.failure(error_type(self.group_id))
elif error_type is Errors.RebalanceInProgressError:
- log.info("SyncGroup for group %s failed due to coordinator"
- " rebalance, rejoining the group", self.group_id)
+ log.debug("SyncGroup for group %s failed due to coordinator"
+ " rebalance", self.group_id)
future.failure(error_type(self.group_id))
elif error_type in (Errors.UnknownMemberIdError,
Errors.IllegalGenerationError):
error = error_type()
- log.info("SyncGroup for group %s failed due to %s,"
- " rejoining the group", self.group_id, error)
+ log.debug("SyncGroup for group %s failed due to %s", self.group_id, error)
self.member_id = JoinGroupRequest.UNKNOWN_MEMBER_ID
future.failure(error)
elif error_type in (Errors.GroupCoordinatorNotAvailableError,
Errors.NotCoordinatorForGroupError):
error = error_type()
- log.info("SyncGroup for group %s failed due to %s, will find new"
- " coordinator and rejoin", self.group_id, error)
+ log.debug("SyncGroup for group %s failed due to %s", self.group_id, error)
self.coordinator_dead()
future.failure(error)
else:
@@ -437,7 +438,7 @@ class BaseCoordinator(object):
log.error("Unexpected error from SyncGroup: %s", error)
future.failure(error)
- def _send_group_metadata_request(self):
+ def _send_group_coordinator_request(self):
"""Discover the current coordinator for the group.
Returns:
@@ -447,7 +448,8 @@ class BaseCoordinator(object):
if node_id is None:
return Future().failure(Errors.NoBrokersAvailable())
- log.debug("Issuing group metadata request to broker %s", node_id)
+ log.debug("Sending group coordinator request for group %s to broker %s",
+ self.group_id, node_id)
request = GroupCoordinatorRequest(self.group_id)
future = Future()
_f = self._client.send(node_id, request)
@@ -456,7 +458,7 @@ class BaseCoordinator(object):
return future
def _handle_group_coordinator_response(self, future, response):
- log.debug("Group metadata response %s", response)
+ log.debug("Received group coordinator response %s", response)
if not self.coordinator_unknown():
# We already found the coordinator, so ignore the request
log.debug("Coordinator already known -- ignoring metadata response")
@@ -473,6 +475,8 @@ class BaseCoordinator(object):
return
self.coordinator_id = response.coordinator_id
+ log.info("Discovered coordinator %s for group %s",
+ self.coordinator_id, self.group_id)
self._client.ready(self.coordinator_id)
# start sending heartbeats only if we have a valid generation
@@ -495,8 +499,8 @@ class BaseCoordinator(object):
def coordinator_dead(self, error=None):
"""Mark the current coordinator as dead."""
if self.coordinator_id is not None:
- log.warning("Marking the coordinator dead (node %s): %s.",
- self.coordinator_id, error)
+ log.warning("Marking the coordinator dead (node %s) for group %s: %s.",
+ self.coordinator_id, self.group_id, error)
self.coordinator_id = None
def close(self):
@@ -542,22 +546,24 @@ class BaseCoordinator(object):
#self.sensors.heartbeat_latency.record(response.requestLatencyMs())
error_type = Errors.for_code(response.error_code)
if error_type is Errors.NoError:
- log.info("Heartbeat successful")
+ log.debug("Received successful heartbeat response for group %s",
+ self.group_id)
future.success(None)
elif error_type in (Errors.GroupCoordinatorNotAvailableError,
Errors.NotCoordinatorForGroupError):
- log.warning("Heartbeat failed: coordinator is either not started or"
- " not valid; will refresh metadata and retry")
+ log.warning("Heartbeat failed for group %s: coordinator (node %s)"
+ " is either not started or not valid", self.group_id,
+ self.coordinator_id)
self.coordinator_dead()
future.failure(error_type())
elif error_type is Errors.RebalanceInProgressError:
- log.warning("Heartbeat: group is rebalancing; this consumer needs to"
- " re-join")
+ log.warning("Heartbeat failed for group %s because it is"
+ " rebalancing", self.group_id)
self.rejoin_needed = True
future.failure(error_type())
elif error_type is Errors.IllegalGenerationError:
- log.warning("Heartbeat: generation id is not current; this consumer"
- " needs to re-join")
+ log.warning("Heartbeat failed for group %s: generation id is not "
+ " current.", self.group_id)
self.rejoin_needed = True
future.failure(error_type())
elif error_type is Errors.UnknownMemberIdError:
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py
index ae2344f..3ce7570 100644
--- a/kafka/coordinator/consumer.py
+++ b/kafka/coordinator/consumer.py
@@ -198,15 +198,18 @@ class ConsumerCoordinator(BaseCoordinator):
self._auto_commit_task.enable()
assigned = set(self._subscription.assigned_partitions())
- log.debug("Set newly assigned partitions %s", assigned)
+ log.info("Setting newly assigned partitions %s for group %s",
+ assigned, self.group_id)
# execute the user's callback after rebalance
if self._subscription.listener:
try:
self._subscription.listener.on_partitions_assigned(assigned)
except Exception:
- log.exception("User provided listener failed on partition"
- " assignment: %s", assigned)
+ log.exception("User provided listener %s for group %s"
+ " failed on partition assignment: %s",
+ self._subscription.listener, self.group_id,
+ assigned)
def _perform_assignment(self, leader_id, assignment_strategy, members):
assignor = self._lookup_assignor(assignment_strategy)
@@ -226,12 +229,13 @@ class ConsumerCoordinator(BaseCoordinator):
self._subscription.group_subscribe(all_subscribed_topics)
self._client.set_topics(self._subscription.group_subscription())
- log.debug("Performing %s assignment for subscriptions %s",
- assignor.name, member_metadata)
+ log.debug("Performing assignment for group %s using strategy %s"
+ " with subscriptions %s", self.group_id, assignor.name,
+ member_metadata)
assignments = assignor.assign(self._cluster, member_metadata)
- log.debug("Finished assignment: %s", assignments)
+ log.debug("Finished assignment for group %s: %s", self.group_id, assignments)
group_assignment = {}
for member_id, assignment in six.iteritems(assignments):
@@ -243,15 +247,16 @@ class ConsumerCoordinator(BaseCoordinator):
self._maybe_auto_commit_offsets_sync()
# execute the user's callback before rebalance
- log.debug("Revoking previously assigned partitions %s",
- self._subscription.assigned_partitions())
+ log.info("Revoking previously assigned partitions %s for group %s",
+ self._subscription.assigned_partitions(), self.group_id)
if self._subscription.listener:
try:
revoked = set(self._subscription.assigned_partitions())
self._subscription.listener.on_partitions_revoked(revoked)
except Exception:
- log.exception("User provided subscription listener failed"
- " on_partitions_revoked")
+ log.exception("User provided subscription listener %s"
+ " for group %s failed on_partitions_revoked",
+ self._subscription.listener, self.group_id)
self._subscription.mark_for_reassignment()
@@ -462,8 +467,8 @@ class ConsumerCoordinator(BaseCoordinator):
) for topic, partitions in six.iteritems(offset_data)]
)
- log.debug("Sending offset-commit request with %s to %s",
- offsets, node_id)
+ log.debug("Sending offset-commit request with %s for group %s to %s",
+ offsets, self.group_id, node_id)
future = Future()
_f = self._client.send(node_id, request)
@@ -482,12 +487,13 @@ class ConsumerCoordinator(BaseCoordinator):
error_type = Errors.for_code(error_code)
if error_type is Errors.NoError:
- log.debug("Committed offset %s for partition %s", offset, tp)
+ log.debug("Group %s committed offset %s for partition %s",
+ self.group_id, offset, tp)
if self._subscription.is_assigned(tp):
self._subscription.assignment[tp].committed = offset.offset
elif error_type is Errors.GroupAuthorizationFailedError:
- log.error("OffsetCommit failed for group %s - %s",
- self.group_id, error_type.__name__)
+ log.error("Not authorized to commit offsets for group %s",
+ self.group_id)
future.failure(error_type(self.group_id))
return
elif error_type is Errors.TopicAuthorizationFailedError:
@@ -495,24 +501,21 @@ class ConsumerCoordinator(BaseCoordinator):
elif error_type in (Errors.OffsetMetadataTooLargeError,
Errors.InvalidCommitOffsetSizeError):
# raise the error to the user
- log.info("OffsetCommit failed for group %s on partition %s"
- " due to %s, will retry", self.group_id, tp,
- error_type.__name__)
+ log.debug("OffsetCommit for group %s failed on partition %s"
+ " %s", self.group_id, tp, error_type.__name__)
future.failure(error_type())
return
elif error_type is Errors.GroupLoadInProgressError:
# just retry
- log.info("OffsetCommit failed for group %s because group is"
- " initializing (%s), will retry", self.group_id,
- error_type.__name__)
+ log.debug("OffsetCommit for group %s failed: %s",
+ self.group_id, error_type.__name__)
future.failure(error_type(self.group_id))
return
elif error_type in (Errors.GroupCoordinatorNotAvailableError,
Errors.NotCoordinatorForGroupError,
Errors.RequestTimedOutError):
- log.info("OffsetCommit failed for group %s due to a"
- " coordinator error (%s), will find new coordinator"
- " and retry", self.group_id, error_type.__name__)
+ log.debug("OffsetCommit for group %s failed: %s",
+ self.group_id, error_type.__name__)
self.coordinator_dead()
future.failure(error_type(self.group_id))
return
@@ -521,22 +524,31 @@ class ConsumerCoordinator(BaseCoordinator):
Errors.RebalanceInProgressError):
# need to re-join group
error = error_type(self.group_id)
- log.error("OffsetCommit failed for group %s due to group"
- " error (%s), will rejoin", self.group_id, error)
+ log.debug("OffsetCommit for group %s failed: %s",
+ self.group_id, error)
self._subscription.mark_for_reassignment()
- # Errors.CommitFailedError("Commit cannot be completed due to group rebalance"))
- future.failure(error)
+ future.failure(Errors.CommitFailedError(
+ "Commit cannot be completed since the group has"
+ " already rebalanced and assigned the partitions to"
+ " another member. This means that the time between"
+ " subsequent calls to poll() was longer than the"
+ " configured session.timeout.ms, which typically"
+ " implies that the poll loop is spending too much time"
+ " message processing. You can address this either by"
+ " increasing the session timeout or by reducing the"
+ " maximum size of batches returned in poll() with"
+ " max.poll.records."))
return
else:
- log.error("OffsetCommit failed for group % on partition %s"
- " with offset %s: %s", self.group_id, tp, offset,
+ log.error("Group %s failed to commit partition %s at offset"
+ " %s: %s", self.group_id, tp, offset,
error_type.__name__)
future.failure(error_type())
return
if unauthorized_topics:
- log.error("OffsetCommit failed for unauthorized topics %s",
- unauthorized_topics)
+ log.error("Not authorized to commit to topics %s for group %s",
+ unauthorized_topics, self.group_id)
future.failure(Errors.TopicAuthorizationFailedError(unauthorized_topics))
else:
future.success(True)
@@ -573,7 +585,8 @@ class ConsumerCoordinator(BaseCoordinator):
node_id)
return Future().failure(Errors.NodeNotReadyError)
- log.debug("Fetching committed offsets for partitions: %s", partitions)
+ log.debug("Group %s fetching committed offsets for partitions: %s",
+ self.group_id, partitions)
# construct the request
topic_partitions = collections.defaultdict(set)
for tp in partitions:
@@ -605,7 +618,8 @@ class ConsumerCoordinator(BaseCoordinator):
error_type = Errors.for_code(error_code)
if error_type is not Errors.NoError:
error = error_type()
- log.debug("Error fetching offset for %s: %s", tp, error_type())
+ log.debug("Group %s failed to fetch offset for partition"
+ " %s: %s", self.group_id, tp, error)
if error_type is Errors.GroupLoadInProgressError:
# just retry
future.failure(error)
@@ -629,10 +643,12 @@ class ConsumerCoordinator(BaseCoordinator):
future.failure(error)
return
elif offset >= 0:
- # record the position with the offset (-1 indicates no committed offset to fetch)
+ # record the position with the offset
+ # (-1 indicates no committed offset to fetch)
offsets[tp] = OffsetAndMetadata(offset, metadata)
else:
- log.debug("No committed offset for partition %s", tp)
+ log.debug("Group %s has no committed offset for partition"
+ " %s", self.group_id, tp)
future.success(offsets)
@@ -669,8 +685,8 @@ class AutoCommitTask(object):
return
if self._coordinator.coordinator_unknown():
- log.debug("Cannot auto-commit offsets because the coordinator is"
- " unknown, will retry after backoff")
+ log.debug("Cannot auto-commit offsets for group %s because the"
+ " coordinator is unknown", self._coordinator.group_id)
backoff = self._coordinator.config['retry_backoff_ms'] / 1000.0
self._client.schedule(self, time.time() + backoff)
return
@@ -683,18 +699,21 @@ class AutoCommitTask(object):
def _handle_commit_response(self, offsets, result):
self._request_in_flight = False
if result is True:
- log.debug("Successfully auto-committed offsets")
+ log.debug("Successfully auto-committed offsets for group %s",
+ self._coordinator.group_id)
next_at = time.time() + self._interval
elif not isinstance(result, BaseException):
raise Errors.IllegalStateError(
'Unrecognized result in _handle_commit_response: %s'
% result)
elif hasattr(result, 'retriable') and result.retriable:
- log.debug("Failed to auto-commit offsets: %s, will retry"
- " immediately", result)
+ log.debug("Failed to auto-commit offsets for group %s: %s,"
+ " will retry immediately", self._coordinator.group_id,
+ result)
next_at = time.time()
else:
- log.warning("Auto offset commit failed: %s", result)
+ log.warning("Auto offset commit failed for group %s: %s",
+ self._coordinator.group_id, result)
next_at = time.time() + self._interval
if not self._enabled: