From 7e19de555007b1059af78097aa70c8810a473513 Mon Sep 17 00:00:00 2001 From: Ross Duggan Date: Mon, 14 Sep 2015 16:30:01 +0100 Subject: Support retry semantics in MultiProcessConsumer --- kafka/conn.py | 4 ++ kafka/consumer/base.py | 1 + kafka/consumer/multiprocess.py | 104 +++++++++++++++++++++++------------------ 3 files changed, 63 insertions(+), 46 deletions(-) diff --git a/kafka/conn.py b/kafka/conn.py index 432e10b..e6a1f74 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -151,6 +151,10 @@ class KafkaConnection(local): """ log.debug("Reading response %d from Kafka" % request_id) + # Make sure we have a connection + if not self._sock: + self.reinit() + # Read the size off of the header resp = self._read_bytes(4) (size,) = struct.unpack('>i', resp) diff --git a/kafka/consumer/base.py b/kafka/consumer/base.py index 0800327..a22d039 100644 --- a/kafka/consumer/base.py +++ b/kafka/consumer/base.py @@ -29,6 +29,7 @@ ITER_TIMEOUT_SECONDS = 60 NO_MESSAGES_WAIT_TIME_SECONDS = 0.1 FULL_QUEUE_WAIT_TIME_SECONDS = 0.1 +MAX_BACKOFF_SECONDS = 60 class Consumer(object): """ diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py index d03eb95..0bef4c5 100644 --- a/kafka/consumer/multiprocess.py +++ b/kafka/consumer/multiprocess.py @@ -9,11 +9,13 @@ except ImportError: from queue import Empty, Full # python 2 import time +from ..common import KafkaError from .base import ( Consumer, AUTO_COMMIT_MSG_COUNT, AUTO_COMMIT_INTERVAL, NO_MESSAGES_WAIT_TIME_SECONDS, - FULL_QUEUE_WAIT_TIME_SECONDS + FULL_QUEUE_WAIT_TIME_SECONDS, + MAX_BACKOFF_SECONDS, ) from .simple import SimpleConsumer @@ -33,57 +35,67 @@ def _mp_consume(client, group, topic, queue, size, events, **consumer_options): functionality breaks unless this function is kept outside of a class """ - # Make the child processes open separate socket connections - client.reinit() + # Initial interval for retries in seconds. + interval = 1 + while not events.exit.is_set(): + try: + # Make the child processes open separate socket connections + client.reinit() - # We will start consumers without auto-commit. Auto-commit will be - # done by the master controller process. - consumer = SimpleConsumer(client, group, topic, - auto_commit=False, - auto_commit_every_n=None, - auto_commit_every_t=None, - **consumer_options) + # We will start consumers without auto-commit. Auto-commit will be + # done by the master controller process. + consumer = SimpleConsumer(client, group, topic, + auto_commit=False, + auto_commit_every_n=None, + auto_commit_every_t=None, + **consumer_options) - # Ensure that the consumer provides the partition information - consumer.provide_partition_info() + # Ensure that the consumer provides the partition information + consumer.provide_partition_info() - while True: - # Wait till the controller indicates us to start consumption - events.start.wait() - - # If we are asked to quit, do so - if events.exit.is_set(): - break - - # Consume messages and add them to the queue. If the controller - # indicates a specific number of messages, follow that advice - count = 0 - - message = consumer.get_message() - if message: while True: - try: - queue.put(message, timeout=FULL_QUEUE_WAIT_TIME_SECONDS) - break - except Full: - if events.exit.is_set(): break + # Wait till the controller indicates us to start consumption + events.start.wait() - count += 1 - - # We have reached the required size. The controller might have - # more than what he needs. Wait for a while. - # Without this logic, it is possible that we run into a big - # loop consuming all available messages before the controller - # can reset the 'start' event - if count == size.value: - events.pause.wait() - - else: - # In case we did not receive any message, give up the CPU for - # a while before we try again - time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS) + # If we are asked to quit, do so + if events.exit.is_set(): + break - consumer.stop() + # Consume messages and add them to the queue. If the controller + # indicates a specific number of messages, follow that advice + count = 0 + + message = consumer.get_message() + if message: + while True: + try: + queue.put(message, timeout=FULL_QUEUE_WAIT_TIME_SECONDS) + break + except Full: + if events.exit.is_set(): break + + count += 1 + + # We have reached the required size. The controller might have + # more than what he needs. Wait for a while. + # Without this logic, it is possible that we run into a big + # loop consuming all available messages before the controller + # can reset the 'start' event + if count == size.value: + events.pause.wait() + + else: + # In case we did not receive any message, give up the CPU for + # a while before we try again + time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS) + + consumer.stop() + + except KafkaError as e: + # Retry with exponential backoff + log.error("Problem communicating with Kafka (%s), retrying in %d seconds..." % (e, interval)) + time.sleep(interval) + interval = interval*2 if interval*2 < MAX_BACKOFF_SECONDS else MAX_BACKOFF_SECONDS class MultiProcessConsumer(Consumer): -- cgit v1.2.1