summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2017-10-07 10:54:28 -0700
committerDana Powers <dana.powers@gmail.com>2017-10-08 14:03:00 -0700
commitaf6f17f0a7ba48e4ced28b590ef8cd535c6e9d4b (patch)
tree3e49e238807c17367a8eb5b100e1e845edd3fcb5
parent7305f03ff0758dad811d51f5e21006f273bb4dc2 (diff)
downloadkafka-python-af6f17f0a7ba48e4ced28b590ef8cd535c6e9d4b.tar.gz
WIP
-rw-r--r--kafka/consumer/group.py37
-rw-r--r--kafka/coordinator/consumer.py12
-rw-r--r--kafka/errors.py12
3 files changed, 41 insertions, 20 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index a83d5da..e37be0e 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -125,19 +125,34 @@ class KafkaConsumer(six.Iterator):
distribute partition ownership amongst consumer instances when
group management is used.
Default: [RangePartitionAssignor, RoundRobinPartitionAssignor]
+ max_poll_records (int): The maximum number of records returned in a
+ single call to :meth:`~kafka.KafkaConsumer.poll`. Default: 500
+ max_poll_interval_ms (int): The maximum delay between invocations of
+ :meth:`~kafka.KafkaConsumer.poll` when using consumer group
+ management. This places an upper bound on the amount of time that
+ the consumer can be idle before fetching more records. If
+ :meth:`~kafka.KafkaConsumer.poll` is not called before expiration
+ of this timeout, then the consumer is considered failed and the
+ group will rebalance in order to reassign the partitions to another
+ member. Default 300000
+ session_timeout_ms (int): The timeout used to detect failures when
+ using Kafka's group management facilities. The consumer sends
+ periodic heartbeats to indicate its liveness to the broker. If
+ no heartbeats are received by the broker before the expiration of
+ this session timeout, then the broker will remove this consumer
+ from the group and initiate a rebalance. Note that the value must
+ be in the allowable range as configured in the broker configuration
+ by group.min.session.timeout.ms and group.max.session.timeout.ms.
+ Default: 10000
heartbeat_interval_ms (int): The expected time in milliseconds
between heartbeats to the consumer coordinator when using
- Kafka's group management feature. Heartbeats are used to ensure
+ Kafka's group management facilities. Heartbeats are used to ensure
that the consumer's session stays active and to facilitate
rebalancing when new consumers join or leave the group. The
value must be set lower than session_timeout_ms, but typically
should be set no higher than 1/3 of that value. It can be
adjusted even lower to control the expected time for normal
rebalances. Default: 3000
- session_timeout_ms (int): The timeout used to detect failures when
- using Kafka's group management facilities. Default: 30000
- max_poll_records (int): The maximum number of records returned in a
- single call to :meth:`~kafka.KafkaConsumer.poll`. Default: 500
receive_buffer_bytes (int): The size of the TCP receive buffer
(SO_RCVBUF) to use when reading data. Default: None (relies on
system defaults). The java client defaults to 32768.
@@ -234,7 +249,7 @@ class KafkaConsumer(six.Iterator):
'fetch_min_bytes': 1,
'fetch_max_bytes': 52428800,
'max_partition_fetch_bytes': 1 * 1024 * 1024,
- 'request_timeout_ms': 40 * 1000,
+ 'request_timeout_ms': 305000, # chosen to be higher than the default of max_poll_interval_ms
'retry_backoff_ms': 100,
'reconnect_backoff_ms': 50,
'reconnect_backoff_max_ms': 1000,
@@ -246,9 +261,10 @@ class KafkaConsumer(six.Iterator):
'check_crcs': True,
'metadata_max_age_ms': 5 * 60 * 1000,
'partition_assignment_strategy': (RangePartitionAssignor, RoundRobinPartitionAssignor),
- 'heartbeat_interval_ms': 3000,
- 'session_timeout_ms': 30000,
'max_poll_records': 500,
+ 'max_poll_interval_ms': 300000,
+ 'session_timeout_ms': 10000,
+ 'heartbeat_interval_ms': 3000,
'receive_buffer_bytes': None,
'send_buffer_bytes': None,
'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
@@ -584,6 +600,9 @@ class KafkaConsumer(six.Iterator):
Returns:
dict: Map of topic to list of records (may be empty).
"""
+ self._coordinator.poll()
+
+ """
if self._use_consumer_group():
self._coordinator.ensure_coordinator_known()
self._coordinator.ensure_active_group()
@@ -591,6 +610,7 @@ class KafkaConsumer(six.Iterator):
# 0.8.2 brokers support kafka-backed offset storage via group coordinator
elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2):
self._coordinator.ensure_coordinator_known()
+ """
# Fetch positions if we have partitions we're subscribed to that we
# don't know the offset for
@@ -612,6 +632,7 @@ class KafkaConsumer(six.Iterator):
# Send any new fetches (won't resend pending fetches)
self._fetcher.send_fetches()
+ timeout_ms = min(timeout_ms, self._coordinator.time_to_next_poll())
self._client.poll(timeout_ms=timeout_ms)
records, _ = self._fetcher.fetched_records(max_records)
return records
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py
index 84c62df..d517dd7 100644
--- a/kafka/coordinator/consumer.py
+++ b/kafka/coordinator/consumer.py
@@ -549,17 +549,7 @@ class ConsumerCoordinator(BaseCoordinator):
log.debug("OffsetCommit for group %s failed: %s",
self.group_id, error)
self._subscription.mark_for_reassignment()
- future.failure(Errors.CommitFailedError(
- "Commit cannot be completed since the group has"
- " already rebalanced and assigned the partitions to"
- " another member. This means that the time between"
- " subsequent calls to poll() was longer than the"
- " configured session.timeout.ms, which typically"
- " implies that the poll loop is spending too much time"
- " message processing. You can address this either by"
- " increasing the session timeout or by reducing the"
- " maximum size of batches returned in poll() with"
- " max.poll.records."))
+ future.failure(Errors.CommitFailedError())
return
else:
log.error("Group %s failed to commit partition %s at offset"
diff --git a/kafka/errors.py b/kafka/errors.py
index 35f9d94..f6cbff1 100644
--- a/kafka/errors.py
+++ b/kafka/errors.py
@@ -55,7 +55,17 @@ class UnrecognizedBrokerVersion(KafkaError):
class CommitFailedError(KafkaError):
- pass
+ def __init__(self, *args, **kwargs):
+ super(CommitFailedError, self).__init__(
+ """Commit cannot be completed since the group has already
+ rebalanced and assigned the partitions to another member.
+ This means that the time between subsequent calls to poll()
+ was longer than the configured max.poll.interval.ms, which
+ typically implies that the poll loop is spending too much
+ time message processing. You can address this either by
+ increasing the session timeout or by reducing the maximum
+ size of batches returned in poll() with max.poll.records.
+ """, *args, **kwargs)
class AuthenticationMethodNotSupported(KafkaError):