diff options
author | Dana Powers <dana.powers@gmail.com> | 2017-10-07 10:54:28 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2017-10-08 14:03:00 -0700 |
commit | af6f17f0a7ba48e4ced28b590ef8cd535c6e9d4b (patch) | |
tree | 3e49e238807c17367a8eb5b100e1e845edd3fcb5 | |
parent | 7305f03ff0758dad811d51f5e21006f273bb4dc2 (diff) | |
download | kafka-python-af6f17f0a7ba48e4ced28b590ef8cd535c6e9d4b.tar.gz |
WIP
-rw-r--r-- | kafka/consumer/group.py | 37 | ||||
-rw-r--r-- | kafka/coordinator/consumer.py | 12 | ||||
-rw-r--r-- | kafka/errors.py | 12 |
3 files changed, 41 insertions, 20 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index a83d5da..e37be0e 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -125,19 +125,34 @@ class KafkaConsumer(six.Iterator): distribute partition ownership amongst consumer instances when group management is used. Default: [RangePartitionAssignor, RoundRobinPartitionAssignor] + max_poll_records (int): The maximum number of records returned in a + single call to :meth:`~kafka.KafkaConsumer.poll`. Default: 500 + max_poll_interval_ms (int): The maximum delay between invocations of + :meth:`~kafka.KafkaConsumer.poll` when using consumer group + management. This places an upper bound on the amount of time that + the consumer can be idle before fetching more records. If + :meth:`~kafka.KafkaConsumer.poll` is not called before expiration + of this timeout, then the consumer is considered failed and the + group will rebalance in order to reassign the partitions to another + member. Default 300000 + session_timeout_ms (int): The timeout used to detect failures when + using Kafka's group management facilities. The consumer sends + periodic heartbeats to indicate its liveness to the broker. If + no heartbeats are received by the broker before the expiration of + this session timeout, then the broker will remove this consumer + from the group and initiate a rebalance. Note that the value must + be in the allowable range as configured in the broker configuration + by group.min.session.timeout.ms and group.max.session.timeout.ms. + Default: 10000 heartbeat_interval_ms (int): The expected time in milliseconds between heartbeats to the consumer coordinator when using - Kafka's group management feature. Heartbeats are used to ensure + Kafka's group management facilities. Heartbeats are used to ensure that the consumer's session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than session_timeout_ms, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances. Default: 3000 - session_timeout_ms (int): The timeout used to detect failures when - using Kafka's group management facilities. Default: 30000 - max_poll_records (int): The maximum number of records returned in a - single call to :meth:`~kafka.KafkaConsumer.poll`. Default: 500 receive_buffer_bytes (int): The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. Default: None (relies on system defaults). The java client defaults to 32768. @@ -234,7 +249,7 @@ class KafkaConsumer(six.Iterator): 'fetch_min_bytes': 1, 'fetch_max_bytes': 52428800, 'max_partition_fetch_bytes': 1 * 1024 * 1024, - 'request_timeout_ms': 40 * 1000, + 'request_timeout_ms': 305000, # chosen to be higher than the default of max_poll_interval_ms 'retry_backoff_ms': 100, 'reconnect_backoff_ms': 50, 'reconnect_backoff_max_ms': 1000, @@ -246,9 +261,10 @@ class KafkaConsumer(six.Iterator): 'check_crcs': True, 'metadata_max_age_ms': 5 * 60 * 1000, 'partition_assignment_strategy': (RangePartitionAssignor, RoundRobinPartitionAssignor), - 'heartbeat_interval_ms': 3000, - 'session_timeout_ms': 30000, 'max_poll_records': 500, + 'max_poll_interval_ms': 300000, + 'session_timeout_ms': 10000, + 'heartbeat_interval_ms': 3000, 'receive_buffer_bytes': None, 'send_buffer_bytes': None, 'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)], @@ -584,6 +600,9 @@ class KafkaConsumer(six.Iterator): Returns: dict: Map of topic to list of records (may be empty). """ + self._coordinator.poll() + + """ if self._use_consumer_group(): self._coordinator.ensure_coordinator_known() self._coordinator.ensure_active_group() @@ -591,6 +610,7 @@ class KafkaConsumer(six.Iterator): # 0.8.2 brokers support kafka-backed offset storage via group coordinator elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2): self._coordinator.ensure_coordinator_known() + """ # Fetch positions if we have partitions we're subscribed to that we # don't know the offset for @@ -612,6 +632,7 @@ class KafkaConsumer(six.Iterator): # Send any new fetches (won't resend pending fetches) self._fetcher.send_fetches() + timeout_ms = min(timeout_ms, self._coordinator.time_to_next_poll()) self._client.poll(timeout_ms=timeout_ms) records, _ = self._fetcher.fetched_records(max_records) return records diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 84c62df..d517dd7 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -549,17 +549,7 @@ class ConsumerCoordinator(BaseCoordinator): log.debug("OffsetCommit for group %s failed: %s", self.group_id, error) self._subscription.mark_for_reassignment() - future.failure(Errors.CommitFailedError( - "Commit cannot be completed since the group has" - " already rebalanced and assigned the partitions to" - " another member. This means that the time between" - " subsequent calls to poll() was longer than the" - " configured session.timeout.ms, which typically" - " implies that the poll loop is spending too much time" - " message processing. You can address this either by" - " increasing the session timeout or by reducing the" - " maximum size of batches returned in poll() with" - " max.poll.records.")) + future.failure(Errors.CommitFailedError()) return else: log.error("Group %s failed to commit partition %s at offset" diff --git a/kafka/errors.py b/kafka/errors.py index 35f9d94..f6cbff1 100644 --- a/kafka/errors.py +++ b/kafka/errors.py @@ -55,7 +55,17 @@ class UnrecognizedBrokerVersion(KafkaError): class CommitFailedError(KafkaError): - pass + def __init__(self, *args, **kwargs): + super(CommitFailedError, self).__init__( + """Commit cannot be completed since the group has already + rebalanced and assigned the partitions to another member. + This means that the time between subsequent calls to poll() + was longer than the configured max.poll.interval.ms, which + typically implies that the poll loop is spending too much + time message processing. You can address this either by + increasing the session timeout or by reducing the maximum + size of batches returned in poll() with max.poll.records. + """, *args, **kwargs) class AuthenticationMethodNotSupported(KafkaError): |