summaryrefslogtreecommitdiff
path: root/kafka/consumer.py
diff options
context:
space:
mode:
authorOmar Ghishan <omar.ghishan@rd.io>2013-12-18 18:36:32 -0800
committerOmar Ghishan <omar.ghishan@rd.io>2014-01-06 15:14:50 -0800
commit0c7cf2569e384fcdde67b86689d64bafbaed953f (patch)
treedf875fdc92e07a1b345f97c4b97ed8bbbe0fe96d /kafka/consumer.py
parent5dd8d81c9e47ee21c22945b90221c67baa7852b9 (diff)
downloadkafka-python-0c7cf2569e384fcdde67b86689d64bafbaed953f.tar.gz
SimpleConsumer flow changes:
* Combine partition fetch requests into a single request * Put the messages received in a queue and update offsets * Grab as many messages from the queue as requested * When the queue is empty, request more * timeout param for get_messages() is the actual timeout for getting those messages * Based on https://github.com/mumrah/kafka-python/pull/74 - don't increase min_bytes if the consumer fetch buffer size is too small. Notes: Change MultiProcessConsumer and _mp_consume() accordingly. Previously, when querying each partition separately, it was possible to block waiting for messages on partition 0 even if there are new ones in partition 1. These changes allow us to block while waiting for messages on all partitions, and reduce total number of kafka requests. Use Queue.Queue for single proc Queue instead of already imported multiprocessing.Queue because the latter doesn't seem to guarantee immediate availability of items after a put: >>> from multiprocessing import Queue >>> q = Queue() >>> q.put(1); q.get_nowait() Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/queues.py", line 152, in get_nowait return self.get(False) File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/queues.py", line 134, in get raise Empty Queue.Empty
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r--kafka/consumer.py182
1 files changed, 70 insertions, 112 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py
index 600c8c7..a5a3e26 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -3,8 +3,8 @@ from itertools import izip_longest, repeat
import logging
import time
from threading import Lock
-from multiprocessing import Process, Queue, Event, Value
-from Queue import Empty
+from multiprocessing import Process, Queue as MPQueue, Event, Value
+from Queue import Empty, Queue
from kafka.common import (
ErrorMapping, FetchRequest,
@@ -227,6 +227,7 @@ class SimpleConsumer(Consumer):
self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME
self.fetch_min_bytes = fetch_size_bytes
self.fetch_started = defaultdict(bool) # defaults to false
+ self.queue = Queue(buffer_size)
super(SimpleConsumer, self).__init__(
client, group, topic,
@@ -292,122 +293,75 @@ class SimpleConsumer(Consumer):
count: Indicates the maximum number of messages to be fetched
block: If True, the API will block till some messages are fetched.
- timeout: If None, and block=True, the API will block infinitely.
- If >0, API will block for specified time (in seconds)
+ timeout: If block is True, the function will block for the specified time (in seconds)
+ until count messages is fetched. If None, it will block forever.
"""
messages = []
- iterator = self.__iter__()
-
- # HACK: This splits the timeout between available partitions
if timeout:
- timeout = timeout * 1.0 / len(self.offsets)
+ max_time = time.time() + timeout
- with FetchContext(self, block, timeout):
- while count > 0:
- try:
- messages.append(next(iterator))
- except StopIteration:
- break
+ while count > 0 and (timeout is None or timeout > 0):
+ message = self.get_message(block, timeout)
+ if message:
+ messages.append(message)
count -= 1
+ else:
+ # Ran out of messages for the last request. If we're not blocking, break.
+ if not block:
+ break
+ if timeout:
+ timeout = max_time - time.time()
return messages
- def __iter__(self):
- """
- Create an iterate per partition. Iterate through them calling next()
- until they are all exhausted.
- """
- iters = {}
- for partition, offset in self.offsets.items():
- iters[partition] = self.__iter_partition__(partition, offset)
-
- if len(iters) == 0:
- return
-
- while True:
- if len(iters) == 0:
- break
-
- for partition, it in iters.items():
- try:
- if self.partition_info:
- yield (partition, it.next())
- else:
- yield it.next()
- except StopIteration:
- log.debug("Done iterating over partition %s" % partition)
- del iters[partition]
-
- # skip auto-commit since we didn't yield anything
- continue
-
- # Count, check and commit messages if necessary
- self.count_since_commit += 1
- self._auto_commit()
-
- def __iter_partition__(self, partition, offset):
- """
- Iterate over the messages in a partition. Create a FetchRequest
- to get back a batch of messages, yield them one at a time.
- After a batch is exhausted, start a new batch unless we've reached
- the end of this partition.
- """
-
- # The offset that is stored in the consumer is the offset that
- # we have consumed. In subsequent iterations, we are supposed to
- # fetch the next message (that is from the next offset)
- # However, for the 0th message, the offset should be as-is.
- # An OffsetFetchRequest to Kafka gives 0 for a new queue. This is
- # problematic, since 0 is offset of a message which we have not yet
- # consumed.
- if self.fetch_started[partition]:
- offset += 1
-
- fetch_size = self.fetch_min_bytes
+ def get_message(self, block=True, timeout=0.1):
+ if self.queue.empty():
+ with FetchContext(self, block, timeout):
+ self._fetch()
+ try:
+ return self.queue.get_nowait()
+ except Empty:
+ return None
+ def __iter__(self):
while True:
- # use MaxBytes = client's bufsize since we're only
- # fetching one topic + partition
- req = FetchRequest(
- self.topic, partition, offset, self.buffer_size)
-
- (resp,) = self.client.send_fetch_request(
- [req],
- max_wait_time=self.fetch_max_wait_time,
- min_bytes=fetch_size)
-
- assert resp.topic == self.topic
- assert resp.partition == partition
+ message = self.get_message(True, 100)
+ if message:
+ yield message
+ else:
+ # In case we did not receive any message, give up the CPU for
+ # a while before we try again
+ time.sleep(0.1)
- next_offset = None
+ def _fetch(self):
+ requests = []
+ partitions = self.offsets.keys()
+ for partition in partitions:
+ requests.append(FetchRequest(self.topic, partition, self.offsets[partition], self.buffer_size))
+ responses = self.client.send_fetch_request(
+ requests,
+ max_wait_time=int(self.fetch_max_wait_time),
+ min_bytes=self.fetch_min_bytes)
+ for resp in responses:
+ partition = resp.partition
try:
for message in resp.messages:
- next_offset = message.offset
-
- # update the offset before the message is yielded. This
- # is so that the consumer state is not lost in certain
- # cases.
- #
- # For eg: the message is yielded and consumed by the
- # caller, but the caller does not come back into the
- # generator again. The message will be consumed but the
- # status will not be updated in the consumer
- self.fetch_started[partition] = True
- self.offsets[partition] = message.offset
- yield message
+ self.offsets[partition] = message.offset + 1
+ # Count, check and commit messages if necessary
+ self.count_since_commit += 1
+ self._auto_commit()
+ if self.partition_info:
+ self.queue.put((partition, message))
+ else:
+ self.queue.put(message)
except ConsumerFetchSizeTooSmall, e:
- fetch_size *= 1.5
- log.warn(
- "Fetch size too small, increasing to %d (1.5x) and retry",
- fetch_size)
- continue
+ self.buffer_size *= 2
+ log.warn("Fetch size too small, increasing to %d (2x) and retry", self.buffer_size)
except ConsumerNoMoreData, e:
log.debug("Iteration was ended by %r", e)
-
- if next_offset is None:
- break
- else:
- offset = next_offset + 1
+ except StopIteration:
+ # Stop iterating through this partition
+ log.debug("Done iterating over partition %s" % partition)
def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
@@ -446,8 +400,9 @@ def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
# indicates a specific number of messages, follow that advice
count = 0
- for partition, message in consumer:
- queue.put((partition, message))
+ message = consumer.get_message()
+ if message:
+ queue.put(message)
count += 1
# We have reached the required size. The controller might have
@@ -457,11 +412,10 @@ def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
# can reset the 'start' event
if count == size.value:
pause.wait()
- break
- # In case we did not receive any message, give up the CPU for
- # a while before we try again
- if count == 0:
+ else:
+ # In case we did not receive any message, give up the CPU for
+ # a while before we try again
time.sleep(0.1)
consumer.stop()
@@ -507,7 +461,7 @@ class MultiProcessConsumer(Consumer):
# Variables for managing and controlling the data flow from
# consumer child process to master
- self.queue = Queue(1024) # Child consumers dump messages into this
+ self.queue = MPQueue(1024) # Child consumers dump messages into this
self.start = Event() # Indicates the consumers to start fetch
self.exit = Event() # Requests the consumers to shutdown
self.pause = Event() # Requests the consumers to pause fetch
@@ -589,8 +543,8 @@ class MultiProcessConsumer(Consumer):
count: Indicates the maximum number of messages to be fetched
block: If True, the API will block till some messages are fetched.
- timeout: If None, and block=True, the API will block infinitely.
- If >0, API will block for specified time (in seconds)
+ timeout: If block is True, the function will block for the specified time (in seconds)
+ until count messages is fetched. If None, it will block forever.
"""
messages = []
@@ -601,7 +555,10 @@ class MultiProcessConsumer(Consumer):
self.size.value = count
self.pause.clear()
- while count > 0:
+ if timeout:
+ max_time = time.time() + timeout
+
+ while count > 0 and (timeout is None or timeout > 0):
# Trigger consumption only if the queue is empty
# By doing this, we will ensure that consumers do not
# go into overdrive and keep consuming thousands of
@@ -621,6 +578,7 @@ class MultiProcessConsumer(Consumer):
self.count_since_commit += 1
self._auto_commit()
count -= 1
+ timeout = max_time - time.time()
self.size.value = 0
self.start.clear()