diff options
author | Dana Powers <dana.powers@gmail.com> | 2015-12-02 13:45:54 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2015-12-02 13:45:54 -0800 |
commit | da217442c18090a6e8b615a3af17f0bdd93c14c8 (patch) | |
tree | 13b830bd2088d6488fef7312b13e7728506e2372 | |
parent | a0f103eb3f8ff19926dfd25a1d5c114237c0bdfa (diff) | |
parent | 7e19de555007b1059af78097aa70c8810a473513 (diff) | |
download | kafka-python-da217442c18090a6e8b615a3af17f0bdd93c14c8.tar.gz |
Merge pull request #456 from barricadeio/mp-retry
Support retry semantics in MultiProcessConsumer
-rw-r--r-- | kafka/conn.py | 4 | ||||
-rw-r--r-- | kafka/consumer/base.py | 1 | ||||
-rw-r--r-- | kafka/consumer/multiprocess.py | 104 |
3 files changed, 63 insertions, 46 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index 432e10b..e6a1f74 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -151,6 +151,10 @@ class KafkaConnection(local): """ log.debug("Reading response %d from Kafka" % request_id) + # Make sure we have a connection + if not self._sock: + self.reinit() + # Read the size off of the header resp = self._read_bytes(4) (size,) = struct.unpack('>i', resp) diff --git a/kafka/consumer/base.py b/kafka/consumer/base.py index 25c01a1..c9f6e48 100644 --- a/kafka/consumer/base.py +++ b/kafka/consumer/base.py @@ -29,6 +29,7 @@ ITER_TIMEOUT_SECONDS = 60 NO_MESSAGES_WAIT_TIME_SECONDS = 0.1 FULL_QUEUE_WAIT_TIME_SECONDS = 0.1 +MAX_BACKOFF_SECONDS = 60 class Consumer(object): """ diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py index bd784cf..0b09102 100644 --- a/kafka/consumer/multiprocess.py +++ b/kafka/consumer/multiprocess.py @@ -9,11 +9,13 @@ except ImportError: from queue import Empty, Full # python 2 import time +from ..common import KafkaError from .base import ( Consumer, AUTO_COMMIT_MSG_COUNT, AUTO_COMMIT_INTERVAL, NO_MESSAGES_WAIT_TIME_SECONDS, - FULL_QUEUE_WAIT_TIME_SECONDS + FULL_QUEUE_WAIT_TIME_SECONDS, + MAX_BACKOFF_SECONDS, ) from .simple import SimpleConsumer @@ -33,57 +35,67 @@ def _mp_consume(client, group, topic, queue, size, events, **consumer_options): functionality breaks unless this function is kept outside of a class """ - # Make the child processes open separate socket connections - client.reinit() + # Initial interval for retries in seconds. + interval = 1 + while not events.exit.is_set(): + try: + # Make the child processes open separate socket connections + client.reinit() - # We will start consumers without auto-commit. Auto-commit will be - # done by the master controller process. - consumer = SimpleConsumer(client, group, topic, - auto_commit=False, - auto_commit_every_n=None, - auto_commit_every_t=None, - **consumer_options) + # We will start consumers without auto-commit. Auto-commit will be + # done by the master controller process. + consumer = SimpleConsumer(client, group, topic, + auto_commit=False, + auto_commit_every_n=None, + auto_commit_every_t=None, + **consumer_options) - # Ensure that the consumer provides the partition information - consumer.provide_partition_info() + # Ensure that the consumer provides the partition information + consumer.provide_partition_info() - while True: - # Wait till the controller indicates us to start consumption - events.start.wait() - - # If we are asked to quit, do so - if events.exit.is_set(): - break - - # Consume messages and add them to the queue. If the controller - # indicates a specific number of messages, follow that advice - count = 0 - - message = consumer.get_message() - if message: while True: - try: - queue.put(message, timeout=FULL_QUEUE_WAIT_TIME_SECONDS) - break - except Full: - if events.exit.is_set(): break + # Wait till the controller indicates us to start consumption + events.start.wait() - count += 1 - - # We have reached the required size. The controller might have - # more than what he needs. Wait for a while. - # Without this logic, it is possible that we run into a big - # loop consuming all available messages before the controller - # can reset the 'start' event - if count == size.value: - events.pause.wait() - - else: - # In case we did not receive any message, give up the CPU for - # a while before we try again - time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS) + # If we are asked to quit, do so + if events.exit.is_set(): + break - consumer.stop() + # Consume messages and add them to the queue. If the controller + # indicates a specific number of messages, follow that advice + count = 0 + + message = consumer.get_message() + if message: + while True: + try: + queue.put(message, timeout=FULL_QUEUE_WAIT_TIME_SECONDS) + break + except Full: + if events.exit.is_set(): break + + count += 1 + + # We have reached the required size. The controller might have + # more than what he needs. Wait for a while. + # Without this logic, it is possible that we run into a big + # loop consuming all available messages before the controller + # can reset the 'start' event + if count == size.value: + events.pause.wait() + + else: + # In case we did not receive any message, give up the CPU for + # a while before we try again + time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS) + + consumer.stop() + + except KafkaError as e: + # Retry with exponential backoff + log.error("Problem communicating with Kafka (%s), retrying in %d seconds..." % (e, interval)) + time.sleep(interval) + interval = interval*2 if interval*2 < MAX_BACKOFF_SECONDS else MAX_BACKOFF_SECONDS class MultiProcessConsumer(Consumer): |