diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-04-05 00:07:15 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-04-05 10:22:54 -0700 |
commit | 145ac227cb7f471467de52c5016ed3727e417911 (patch) | |
tree | 799ba40ae175f0bc6e5c5013991805eb6dab5fd1 /kafka/coordinator | |
parent | 5a14bd8c947251d1a8f848175cc3cf2b07af3411 (diff) | |
download | kafka-python-145ac227cb7f471467de52c5016ed3727e417911.tar.gz |
KAFKA-3318: clean up consumer logging and error messages
Diffstat (limited to 'kafka/coordinator')
-rw-r--r-- | kafka/coordinator/base.py | 78 | ||||
-rw-r--r-- | kafka/coordinator/consumer.py | 103 |
2 files changed, 103 insertions, 78 deletions
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index fcf3901..3c7ea21 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -200,7 +200,7 @@ class BaseCoordinator(object): self._client.poll() continue - future = self._send_group_metadata_request() + future = self._send_group_coordinator_request() self._client.poll(future=future) if future.failed(): @@ -233,7 +233,7 @@ class BaseCoordinator(object): while self.need_rejoin(): self.ensure_coordinator_known() - future = self._perform_group_join() + future = self._send_join_group_request() self._client.poll(future=future) if future.succeeded(): @@ -253,7 +253,7 @@ class BaseCoordinator(object): raise exception # pylint: disable-msg=raising-bad-type time.sleep(self.config['retry_backoff_ms'] / 1000.0) - def _perform_group_join(self): + def _send_join_group_request(self): """Join the group and return the assignment for the next generation. This function handles both JoinGroup and SyncGroup, delegating to @@ -268,7 +268,7 @@ class BaseCoordinator(object): return Future().failure(e) # send a join group request to the coordinator - log.debug("(Re-)joining group %s", self.group_id) + log.info("(Re-)joining group %s", self.group_id) request = JoinGroupRequest( self.group_id, self.config['session_timeout_ms'], @@ -279,7 +279,7 @@ class BaseCoordinator(object): for protocol, metadata in self.group_protocols()]) # create the request for the coordinator - log.debug("Issuing request (%s) to coordinator %s", request, self.coordinator_id) + log.debug("Sending JoinGroup (%s) to coordinator %s", request, self.coordinator_id) future = Future() _f = self._client.send(self.coordinator_id, request) _f.add_callback(self._handle_join_group_response, future) @@ -300,6 +300,8 @@ class BaseCoordinator(object): def _handle_join_group_response(self, future, response): error_type = Errors.for_code(response.error_code) if error_type is Errors.NoError: + log.debug("Received successful JoinGroup response for group %s: %s", + self.group_id, response) self.member_id = response.member_id self.generation = response.generation_id self.rejoin_needed = False @@ -315,30 +317,31 @@ class BaseCoordinator(object): self._on_join_follower().chain(future) elif error_type is Errors.GroupLoadInProgressError: - log.debug("Attempt to join group %s rejected since coordinator is" - " loading the group.", self.group_id) + log.debug("Attempt to join group %s rejected since coordinator %s" + " is loading the group.", self.group_id, self.coordinator_id) # backoff and retry future.failure(error_type(response)) elif error_type is Errors.UnknownMemberIdError: # reset the member id and retry immediately error = error_type(self.member_id) self.member_id = JoinGroupRequest.UNKNOWN_MEMBER_ID - log.info("Attempt to join group %s failed due to unknown member id," - " resetting and retrying.", self.group_id) + log.debug("Attempt to join group %s failed due to unknown member id", + self.group_id) future.failure(error) elif error_type in (Errors.GroupCoordinatorNotAvailableError, Errors.NotCoordinatorForGroupError): # re-discover the coordinator and retry with backoff self.coordinator_dead() - log.info("Attempt to join group %s failed due to obsolete " - "coordinator information, retrying.", self.group_id) + log.debug("Attempt to join group %s failed due to obsolete " + "coordinator information: %s", self.group_id, + error_type.__name__) future.failure(error_type()) elif error_type in (Errors.InconsistentGroupProtocolError, Errors.InvalidSessionTimeoutError, Errors.InvalidGroupIdError): # log the error and re-throw the exception error = error_type(response) - log.error("Attempt to join group %s failed due to: %s", + log.error("Attempt to join group %s failed due to fatal error: %s", self.group_id, error) future.failure(error) elif error_type is Errors.GroupAuthorizationFailedError: @@ -356,8 +359,8 @@ class BaseCoordinator(object): self.generation, self.member_id, {}) - log.debug("Issuing follower SyncGroup (%s) to coordinator %s", - request, self.coordinator_id) + log.debug("Sending follower SyncGroup for group %s to coordinator %s: %s", + self.group_id, self.coordinator_id, request) return self._send_sync_group_request(request) def _on_join_leader(self, response): @@ -386,8 +389,8 @@ class BaseCoordinator(object): assignment if isinstance(assignment, bytes) else assignment.encode()) for member_id, assignment in six.iteritems(group_assignment)]) - log.debug("Issuing leader SyncGroup (%s) to coordinator %s", - request, self.coordinator_id) + log.debug("Sending leader SyncGroup for group %s to coordinator %s: %s", + self.group_id, self.coordinator_id, request) return self._send_sync_group_request(request) def _send_sync_group_request(self, request): @@ -404,8 +407,8 @@ class BaseCoordinator(object): def _handle_sync_group_response(self, future, response): error_type = Errors.for_code(response.error_code) if error_type is Errors.NoError: - log.debug("Received successful sync group response for group %s: %s", - self.group_id, response) + log.info("Successfully joined group %s with generation %s", + self.group_id, self.generation) #self.sensors.syncLatency.record(response.requestLatencyMs()) future.success(response.member_assignment) return @@ -415,21 +418,19 @@ class BaseCoordinator(object): if error_type is Errors.GroupAuthorizationFailedError: future.failure(error_type(self.group_id)) elif error_type is Errors.RebalanceInProgressError: - log.info("SyncGroup for group %s failed due to coordinator" - " rebalance, rejoining the group", self.group_id) + log.debug("SyncGroup for group %s failed due to coordinator" + " rebalance", self.group_id) future.failure(error_type(self.group_id)) elif error_type in (Errors.UnknownMemberIdError, Errors.IllegalGenerationError): error = error_type() - log.info("SyncGroup for group %s failed due to %s," - " rejoining the group", self.group_id, error) + log.debug("SyncGroup for group %s failed due to %s", self.group_id, error) self.member_id = JoinGroupRequest.UNKNOWN_MEMBER_ID future.failure(error) elif error_type in (Errors.GroupCoordinatorNotAvailableError, Errors.NotCoordinatorForGroupError): error = error_type() - log.info("SyncGroup for group %s failed due to %s, will find new" - " coordinator and rejoin", self.group_id, error) + log.debug("SyncGroup for group %s failed due to %s", self.group_id, error) self.coordinator_dead() future.failure(error) else: @@ -437,7 +438,7 @@ class BaseCoordinator(object): log.error("Unexpected error from SyncGroup: %s", error) future.failure(error) - def _send_group_metadata_request(self): + def _send_group_coordinator_request(self): """Discover the current coordinator for the group. Returns: @@ -447,7 +448,8 @@ class BaseCoordinator(object): if node_id is None: return Future().failure(Errors.NoBrokersAvailable()) - log.debug("Issuing group metadata request to broker %s", node_id) + log.debug("Sending group coordinator request for group %s to broker %s", + self.group_id, node_id) request = GroupCoordinatorRequest(self.group_id) future = Future() _f = self._client.send(node_id, request) @@ -456,7 +458,7 @@ class BaseCoordinator(object): return future def _handle_group_coordinator_response(self, future, response): - log.debug("Group metadata response %s", response) + log.debug("Received group coordinator response %s", response) if not self.coordinator_unknown(): # We already found the coordinator, so ignore the request log.debug("Coordinator already known -- ignoring metadata response") @@ -473,6 +475,8 @@ class BaseCoordinator(object): return self.coordinator_id = response.coordinator_id + log.info("Discovered coordinator %s for group %s", + self.coordinator_id, self.group_id) self._client.ready(self.coordinator_id) # start sending heartbeats only if we have a valid generation @@ -495,8 +499,8 @@ class BaseCoordinator(object): def coordinator_dead(self, error=None): """Mark the current coordinator as dead.""" if self.coordinator_id is not None: - log.warning("Marking the coordinator dead (node %s): %s.", - self.coordinator_id, error) + log.warning("Marking the coordinator dead (node %s) for group %s: %s.", + self.coordinator_id, self.group_id, error) self.coordinator_id = None def close(self): @@ -542,22 +546,24 @@ class BaseCoordinator(object): #self.sensors.heartbeat_latency.record(response.requestLatencyMs()) error_type = Errors.for_code(response.error_code) if error_type is Errors.NoError: - log.info("Heartbeat successful") + log.debug("Received successful heartbeat response for group %s", + self.group_id) future.success(None) elif error_type in (Errors.GroupCoordinatorNotAvailableError, Errors.NotCoordinatorForGroupError): - log.warning("Heartbeat failed: coordinator is either not started or" - " not valid; will refresh metadata and retry") + log.warning("Heartbeat failed for group %s: coordinator (node %s)" + " is either not started or not valid", self.group_id, + self.coordinator_id) self.coordinator_dead() future.failure(error_type()) elif error_type is Errors.RebalanceInProgressError: - log.warning("Heartbeat: group is rebalancing; this consumer needs to" - " re-join") + log.warning("Heartbeat failed for group %s because it is" + " rebalancing", self.group_id) self.rejoin_needed = True future.failure(error_type()) elif error_type is Errors.IllegalGenerationError: - log.warning("Heartbeat: generation id is not current; this consumer" - " needs to re-join") + log.warning("Heartbeat failed for group %s: generation id is not " + " current.", self.group_id) self.rejoin_needed = True future.failure(error_type()) elif error_type is Errors.UnknownMemberIdError: diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index ae2344f..3ce7570 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -198,15 +198,18 @@ class ConsumerCoordinator(BaseCoordinator): self._auto_commit_task.enable() assigned = set(self._subscription.assigned_partitions()) - log.debug("Set newly assigned partitions %s", assigned) + log.info("Setting newly assigned partitions %s for group %s", + assigned, self.group_id) # execute the user's callback after rebalance if self._subscription.listener: try: self._subscription.listener.on_partitions_assigned(assigned) except Exception: - log.exception("User provided listener failed on partition" - " assignment: %s", assigned) + log.exception("User provided listener %s for group %s" + " failed on partition assignment: %s", + self._subscription.listener, self.group_id, + assigned) def _perform_assignment(self, leader_id, assignment_strategy, members): assignor = self._lookup_assignor(assignment_strategy) @@ -226,12 +229,13 @@ class ConsumerCoordinator(BaseCoordinator): self._subscription.group_subscribe(all_subscribed_topics) self._client.set_topics(self._subscription.group_subscription()) - log.debug("Performing %s assignment for subscriptions %s", - assignor.name, member_metadata) + log.debug("Performing assignment for group %s using strategy %s" + " with subscriptions %s", self.group_id, assignor.name, + member_metadata) assignments = assignor.assign(self._cluster, member_metadata) - log.debug("Finished assignment: %s", assignments) + log.debug("Finished assignment for group %s: %s", self.group_id, assignments) group_assignment = {} for member_id, assignment in six.iteritems(assignments): @@ -243,15 +247,16 @@ class ConsumerCoordinator(BaseCoordinator): self._maybe_auto_commit_offsets_sync() # execute the user's callback before rebalance - log.debug("Revoking previously assigned partitions %s", - self._subscription.assigned_partitions()) + log.info("Revoking previously assigned partitions %s for group %s", + self._subscription.assigned_partitions(), self.group_id) if self._subscription.listener: try: revoked = set(self._subscription.assigned_partitions()) self._subscription.listener.on_partitions_revoked(revoked) except Exception: - log.exception("User provided subscription listener failed" - " on_partitions_revoked") + log.exception("User provided subscription listener %s" + " for group %s failed on_partitions_revoked", + self._subscription.listener, self.group_id) self._subscription.mark_for_reassignment() @@ -462,8 +467,8 @@ class ConsumerCoordinator(BaseCoordinator): ) for topic, partitions in six.iteritems(offset_data)] ) - log.debug("Sending offset-commit request with %s to %s", - offsets, node_id) + log.debug("Sending offset-commit request with %s for group %s to %s", + offsets, self.group_id, node_id) future = Future() _f = self._client.send(node_id, request) @@ -482,12 +487,13 @@ class ConsumerCoordinator(BaseCoordinator): error_type = Errors.for_code(error_code) if error_type is Errors.NoError: - log.debug("Committed offset %s for partition %s", offset, tp) + log.debug("Group %s committed offset %s for partition %s", + self.group_id, offset, tp) if self._subscription.is_assigned(tp): self._subscription.assignment[tp].committed = offset.offset elif error_type is Errors.GroupAuthorizationFailedError: - log.error("OffsetCommit failed for group %s - %s", - self.group_id, error_type.__name__) + log.error("Not authorized to commit offsets for group %s", + self.group_id) future.failure(error_type(self.group_id)) return elif error_type is Errors.TopicAuthorizationFailedError: @@ -495,24 +501,21 @@ class ConsumerCoordinator(BaseCoordinator): elif error_type in (Errors.OffsetMetadataTooLargeError, Errors.InvalidCommitOffsetSizeError): # raise the error to the user - log.info("OffsetCommit failed for group %s on partition %s" - " due to %s, will retry", self.group_id, tp, - error_type.__name__) + log.debug("OffsetCommit for group %s failed on partition %s" + " %s", self.group_id, tp, error_type.__name__) future.failure(error_type()) return elif error_type is Errors.GroupLoadInProgressError: # just retry - log.info("OffsetCommit failed for group %s because group is" - " initializing (%s), will retry", self.group_id, - error_type.__name__) + log.debug("OffsetCommit for group %s failed: %s", + self.group_id, error_type.__name__) future.failure(error_type(self.group_id)) return elif error_type in (Errors.GroupCoordinatorNotAvailableError, Errors.NotCoordinatorForGroupError, Errors.RequestTimedOutError): - log.info("OffsetCommit failed for group %s due to a" - " coordinator error (%s), will find new coordinator" - " and retry", self.group_id, error_type.__name__) + log.debug("OffsetCommit for group %s failed: %s", + self.group_id, error_type.__name__) self.coordinator_dead() future.failure(error_type(self.group_id)) return @@ -521,22 +524,31 @@ class ConsumerCoordinator(BaseCoordinator): Errors.RebalanceInProgressError): # need to re-join group error = error_type(self.group_id) - log.error("OffsetCommit failed for group %s due to group" - " error (%s), will rejoin", self.group_id, error) + log.debug("OffsetCommit for group %s failed: %s", + self.group_id, error) self._subscription.mark_for_reassignment() - # Errors.CommitFailedError("Commit cannot be completed due to group rebalance")) - future.failure(error) + future.failure(Errors.CommitFailedError( + "Commit cannot be completed since the group has" + " already rebalanced and assigned the partitions to" + " another member. This means that the time between" + " subsequent calls to poll() was longer than the" + " configured session.timeout.ms, which typically" + " implies that the poll loop is spending too much time" + " message processing. You can address this either by" + " increasing the session timeout or by reducing the" + " maximum size of batches returned in poll() with" + " max.poll.records.")) return else: - log.error("OffsetCommit failed for group % on partition %s" - " with offset %s: %s", self.group_id, tp, offset, + log.error("Group %s failed to commit partition %s at offset" + " %s: %s", self.group_id, tp, offset, error_type.__name__) future.failure(error_type()) return if unauthorized_topics: - log.error("OffsetCommit failed for unauthorized topics %s", - unauthorized_topics) + log.error("Not authorized to commit to topics %s for group %s", + unauthorized_topics, self.group_id) future.failure(Errors.TopicAuthorizationFailedError(unauthorized_topics)) else: future.success(True) @@ -573,7 +585,8 @@ class ConsumerCoordinator(BaseCoordinator): node_id) return Future().failure(Errors.NodeNotReadyError) - log.debug("Fetching committed offsets for partitions: %s", partitions) + log.debug("Group %s fetching committed offsets for partitions: %s", + self.group_id, partitions) # construct the request topic_partitions = collections.defaultdict(set) for tp in partitions: @@ -605,7 +618,8 @@ class ConsumerCoordinator(BaseCoordinator): error_type = Errors.for_code(error_code) if error_type is not Errors.NoError: error = error_type() - log.debug("Error fetching offset for %s: %s", tp, error_type()) + log.debug("Group %s failed to fetch offset for partition" + " %s: %s", self.group_id, tp, error) if error_type is Errors.GroupLoadInProgressError: # just retry future.failure(error) @@ -629,10 +643,12 @@ class ConsumerCoordinator(BaseCoordinator): future.failure(error) return elif offset >= 0: - # record the position with the offset (-1 indicates no committed offset to fetch) + # record the position with the offset + # (-1 indicates no committed offset to fetch) offsets[tp] = OffsetAndMetadata(offset, metadata) else: - log.debug("No committed offset for partition %s", tp) + log.debug("Group %s has no committed offset for partition" + " %s", self.group_id, tp) future.success(offsets) @@ -669,8 +685,8 @@ class AutoCommitTask(object): return if self._coordinator.coordinator_unknown(): - log.debug("Cannot auto-commit offsets because the coordinator is" - " unknown, will retry after backoff") + log.debug("Cannot auto-commit offsets for group %s because the" + " coordinator is unknown", self._coordinator.group_id) backoff = self._coordinator.config['retry_backoff_ms'] / 1000.0 self._client.schedule(self, time.time() + backoff) return @@ -683,18 +699,21 @@ class AutoCommitTask(object): def _handle_commit_response(self, offsets, result): self._request_in_flight = False if result is True: - log.debug("Successfully auto-committed offsets") + log.debug("Successfully auto-committed offsets for group %s", + self._coordinator.group_id) next_at = time.time() + self._interval elif not isinstance(result, BaseException): raise Errors.IllegalStateError( 'Unrecognized result in _handle_commit_response: %s' % result) elif hasattr(result, 'retriable') and result.retriable: - log.debug("Failed to auto-commit offsets: %s, will retry" - " immediately", result) + log.debug("Failed to auto-commit offsets for group %s: %s," + " will retry immediately", self._coordinator.group_id, + result) next_at = time.time() else: - log.warning("Auto offset commit failed: %s", result) + log.warning("Auto offset commit failed for group %s: %s", + self._coordinator.group_id, result) next_at = time.time() + self._interval if not self._enabled: |