summaryrefslogtreecommitdiff
path: root/kafka/consumer.py
diff options
context:
space:
mode:
authorOmar Ghishan <omar.ghishan@rd.io>2014-01-03 15:49:55 -0800
committerOmar Ghishan <omar.ghishan@rd.io>2014-01-06 15:14:51 -0800
commitbbd90e12ffd83e7ed845c488e21a7155c25f5b82 (patch)
treee3963187fa7f8d2b7221210029b50bbacc9d0cb4 /kafka/consumer.py
parent8540f1f3b6b07f9ddb28d3ade78679a0ac2d4355 (diff)
downloadkafka-python-bbd90e12ffd83e7ed845c488e21a7155c25f5b82.tar.gz
Add a limit to fetch buffer size, and actually retry requests when fetch size is too small
Note: This can cause fetching a message to exceed a given timeout, but timeouts are not guaranteed anyways, and in this case it's the client's fault for not sending a big enough buffer size rather than the kafka server. This can be bad if max_fetch_size is None (no limit) and there is some message in Kafka that is crazy huge, but that is why we should have some max_fetch_size.
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r--kafka/consumer.py95
1 files changed, 58 insertions, 37 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py
index 8cf2760..29529d6 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -23,6 +23,7 @@ FETCH_DEFAULT_BLOCK_TIMEOUT = 1
FETCH_MAX_WAIT_TIME = 100
FETCH_MIN_BYTES = 4096
FETCH_BUFFER_SIZE_BYTES = 4096
+MAX_FETCH_BUFFER_SIZE_BYTES = FETCH_BUFFER_SIZE_BYTES * 8
ITER_TIMEOUT_SECONDS = 60
NO_MESSAGES_WAIT_TIME_SECONDS = 0.1
@@ -211,8 +212,10 @@ class SimpleConsumer(Consumer):
auto_commit_every_t: default 5000. How much time (in milliseconds) to
wait before commit
fetch_size_bytes: number of bytes to request in a FetchRequest
- buffer_size: initial number of bytes to tell kafka we have
- available. This will double every time it's not enough
+ buffer_size: default 4K. Initial number of bytes to tell kafka we
+ have available. This will double as needed.
+ max_buffer_size: default 16K. Max number of bytes to tell kafka we have
+ available. None means no limit.
iter_timeout: default None. How much time (in seconds) to wait for a
message in the iterator before exiting. None means no
timeout, so it will wait forever.
@@ -228,9 +231,15 @@ class SimpleConsumer(Consumer):
auto_commit_every_t=AUTO_COMMIT_INTERVAL,
fetch_size_bytes=FETCH_MIN_BYTES,
buffer_size=FETCH_BUFFER_SIZE_BYTES,
+ max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES,
iter_timeout=None):
+ if max_buffer_size is not None and buffer_size > max_buffer_size:
+ raise ValueError("buffer_size (%d) is greater than "
+ "max_buffer_size (%d)" %
+ (buffer_size, max_buffer_size))
self.buffer_size = buffer_size
+ self.max_buffer_size = max_buffer_size
self.partition_info = False # Do not return partition info in msgs
self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME
self.fetch_min_bytes = fetch_size_bytes
@@ -353,42 +362,54 @@ class SimpleConsumer(Consumer):
# Create fetch request payloads for all the partitions
requests = []
partitions = self.offsets.keys()
- for partition in partitions:
- requests.append(FetchRequest(self.topic, partition,
- self.offsets[partition],
- self.buffer_size))
- # Send request
- responses = self.client.send_fetch_request(
- requests,
- max_wait_time=int(self.fetch_max_wait_time),
- min_bytes=self.fetch_min_bytes)
-
- for resp in responses:
- partition = resp.partition
- try:
- for message in resp.messages:
- # Update partition offset
- self.offsets[partition] = message.offset + 1
-
- # Count, check and commit messages if necessary
- self.count_since_commit += 1
- self._auto_commit()
-
- # Put the message in our queue
- if self.partition_info:
- self.queue.put((partition, message))
+ while partitions:
+ for partition in partitions:
+ requests.append(FetchRequest(self.topic, partition,
+ self.offsets[partition],
+ self.buffer_size))
+ # Send request
+ responses = self.client.send_fetch_request(
+ requests,
+ max_wait_time=int(self.fetch_max_wait_time),
+ min_bytes=self.fetch_min_bytes)
+
+ retry_partitions = set()
+ for resp in responses:
+ partition = resp.partition
+ try:
+ for message in resp.messages:
+ # Update partition offset
+ self.offsets[partition] = message.offset + 1
+
+ # Count, check and commit messages if necessary
+ self.count_since_commit += 1
+ self._auto_commit()
+
+ # Put the message in our queue
+ if self.partition_info:
+ self.queue.put((partition, message))
+ else:
+ self.queue.put(message)
+ except ConsumerFetchSizeTooSmall, e:
+ if (self.max_buffer_size is not None and
+ self.buffer_size == self.max_buffer_size):
+ log.error("Max fetch size %d too small",
+ self.max_buffer_size)
+ raise e
+ if self.max_buffer_size is None:
+ self.buffer_size *= 2
else:
- self.queue.put(message)
- except ConsumerFetchSizeTooSmall, e:
- self.buffer_size *= 2
- log.warn("Fetch size too small, increase to %d (2x) and retry",
- self.buffer_size)
- except ConsumerNoMoreData, e:
- log.debug("Iteration was ended by %r", e)
- except StopIteration:
- # Stop iterating through this partition
- log.debug("Done iterating over partition %s" % partition)
-
+ self.buffer_size = max([self.buffer_size * 2,
+ self.max_buffer_size])
+ log.warn("Fetch size too small, increase to %d (2x) "
+ "and retry", self.buffer_size)
+ retry_partitions.add(partition)
+ except ConsumerNoMoreData, e:
+ log.debug("Iteration was ended by %r", e)
+ except StopIteration:
+ # Stop iterating through this partition
+ log.debug("Done iterating over partition %s" % partition)
+ partitions = retry_partitions
def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
"""