diff options
author | Omar Ghishan <omar.ghishan@rd.io> | 2014-01-03 15:49:55 -0800 |
---|---|---|
committer | Omar Ghishan <omar.ghishan@rd.io> | 2014-01-06 15:14:51 -0800 |
commit | bbd90e12ffd83e7ed845c488e21a7155c25f5b82 (patch) | |
tree | e3963187fa7f8d2b7221210029b50bbacc9d0cb4 /kafka/consumer.py | |
parent | 8540f1f3b6b07f9ddb28d3ade78679a0ac2d4355 (diff) | |
download | kafka-python-bbd90e12ffd83e7ed845c488e21a7155c25f5b82.tar.gz |
Add a limit to fetch buffer size, and actually retry requests when fetch size is too small
Note: This can cause fetching a message to exceed a given timeout, but timeouts are not guaranteed anyways, and in this case it's the client's fault for not sending a big enough buffer size rather than the kafka server. This can be bad if max_fetch_size is None (no limit) and there is some message in Kafka that is crazy huge, but that is why we should have some max_fetch_size.
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r-- | kafka/consumer.py | 95 |
1 files changed, 58 insertions, 37 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py index 8cf2760..29529d6 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -23,6 +23,7 @@ FETCH_DEFAULT_BLOCK_TIMEOUT = 1 FETCH_MAX_WAIT_TIME = 100 FETCH_MIN_BYTES = 4096 FETCH_BUFFER_SIZE_BYTES = 4096 +MAX_FETCH_BUFFER_SIZE_BYTES = FETCH_BUFFER_SIZE_BYTES * 8 ITER_TIMEOUT_SECONDS = 60 NO_MESSAGES_WAIT_TIME_SECONDS = 0.1 @@ -211,8 +212,10 @@ class SimpleConsumer(Consumer): auto_commit_every_t: default 5000. How much time (in milliseconds) to wait before commit fetch_size_bytes: number of bytes to request in a FetchRequest - buffer_size: initial number of bytes to tell kafka we have - available. This will double every time it's not enough + buffer_size: default 4K. Initial number of bytes to tell kafka we + have available. This will double as needed. + max_buffer_size: default 16K. Max number of bytes to tell kafka we have + available. None means no limit. iter_timeout: default None. How much time (in seconds) to wait for a message in the iterator before exiting. None means no timeout, so it will wait forever. @@ -228,9 +231,15 @@ class SimpleConsumer(Consumer): auto_commit_every_t=AUTO_COMMIT_INTERVAL, fetch_size_bytes=FETCH_MIN_BYTES, buffer_size=FETCH_BUFFER_SIZE_BYTES, + max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES, iter_timeout=None): + if max_buffer_size is not None and buffer_size > max_buffer_size: + raise ValueError("buffer_size (%d) is greater than " + "max_buffer_size (%d)" % + (buffer_size, max_buffer_size)) self.buffer_size = buffer_size + self.max_buffer_size = max_buffer_size self.partition_info = False # Do not return partition info in msgs self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME self.fetch_min_bytes = fetch_size_bytes @@ -353,42 +362,54 @@ class SimpleConsumer(Consumer): # Create fetch request payloads for all the partitions requests = [] partitions = self.offsets.keys() - for partition in partitions: - requests.append(FetchRequest(self.topic, partition, - self.offsets[partition], - self.buffer_size)) - # Send request - responses = self.client.send_fetch_request( - requests, - max_wait_time=int(self.fetch_max_wait_time), - min_bytes=self.fetch_min_bytes) - - for resp in responses: - partition = resp.partition - try: - for message in resp.messages: - # Update partition offset - self.offsets[partition] = message.offset + 1 - - # Count, check and commit messages if necessary - self.count_since_commit += 1 - self._auto_commit() - - # Put the message in our queue - if self.partition_info: - self.queue.put((partition, message)) + while partitions: + for partition in partitions: + requests.append(FetchRequest(self.topic, partition, + self.offsets[partition], + self.buffer_size)) + # Send request + responses = self.client.send_fetch_request( + requests, + max_wait_time=int(self.fetch_max_wait_time), + min_bytes=self.fetch_min_bytes) + + retry_partitions = set() + for resp in responses: + partition = resp.partition + try: + for message in resp.messages: + # Update partition offset + self.offsets[partition] = message.offset + 1 + + # Count, check and commit messages if necessary + self.count_since_commit += 1 + self._auto_commit() + + # Put the message in our queue + if self.partition_info: + self.queue.put((partition, message)) + else: + self.queue.put(message) + except ConsumerFetchSizeTooSmall, e: + if (self.max_buffer_size is not None and + self.buffer_size == self.max_buffer_size): + log.error("Max fetch size %d too small", + self.max_buffer_size) + raise e + if self.max_buffer_size is None: + self.buffer_size *= 2 else: - self.queue.put(message) - except ConsumerFetchSizeTooSmall, e: - self.buffer_size *= 2 - log.warn("Fetch size too small, increase to %d (2x) and retry", - self.buffer_size) - except ConsumerNoMoreData, e: - log.debug("Iteration was ended by %r", e) - except StopIteration: - # Stop iterating through this partition - log.debug("Done iterating over partition %s" % partition) - + self.buffer_size = max([self.buffer_size * 2, + self.max_buffer_size]) + log.warn("Fetch size too small, increase to %d (2x) " + "and retry", self.buffer_size) + retry_partitions.add(partition) + except ConsumerNoMoreData, e: + log.debug("Iteration was ended by %r", e) + except StopIteration: + # Stop iterating through this partition + log.debug("Done iterating over partition %s" % partition) + partitions = retry_partitions def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size): """ |