summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2015-12-02 13:45:54 -0800
committerDana Powers <dana.powers@gmail.com>2015-12-02 13:45:54 -0800
commitda217442c18090a6e8b615a3af17f0bdd93c14c8 (patch)
tree13b830bd2088d6488fef7312b13e7728506e2372
parenta0f103eb3f8ff19926dfd25a1d5c114237c0bdfa (diff)
parent7e19de555007b1059af78097aa70c8810a473513 (diff)
downloadkafka-python-da217442c18090a6e8b615a3af17f0bdd93c14c8.tar.gz
Merge pull request #456 from barricadeio/mp-retry
Support retry semantics in MultiProcessConsumer
-rw-r--r--kafka/conn.py4
-rw-r--r--kafka/consumer/base.py1
-rw-r--r--kafka/consumer/multiprocess.py104
3 files changed, 63 insertions, 46 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index 432e10b..e6a1f74 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -151,6 +151,10 @@ class KafkaConnection(local):
"""
log.debug("Reading response %d from Kafka" % request_id)
+ # Make sure we have a connection
+ if not self._sock:
+ self.reinit()
+
# Read the size off of the header
resp = self._read_bytes(4)
(size,) = struct.unpack('>i', resp)
diff --git a/kafka/consumer/base.py b/kafka/consumer/base.py
index 25c01a1..c9f6e48 100644
--- a/kafka/consumer/base.py
+++ b/kafka/consumer/base.py
@@ -29,6 +29,7 @@ ITER_TIMEOUT_SECONDS = 60
NO_MESSAGES_WAIT_TIME_SECONDS = 0.1
FULL_QUEUE_WAIT_TIME_SECONDS = 0.1
+MAX_BACKOFF_SECONDS = 60
class Consumer(object):
"""
diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py
index bd784cf..0b09102 100644
--- a/kafka/consumer/multiprocess.py
+++ b/kafka/consumer/multiprocess.py
@@ -9,11 +9,13 @@ except ImportError:
from queue import Empty, Full # python 2
import time
+from ..common import KafkaError
from .base import (
Consumer,
AUTO_COMMIT_MSG_COUNT, AUTO_COMMIT_INTERVAL,
NO_MESSAGES_WAIT_TIME_SECONDS,
- FULL_QUEUE_WAIT_TIME_SECONDS
+ FULL_QUEUE_WAIT_TIME_SECONDS,
+ MAX_BACKOFF_SECONDS,
)
from .simple import SimpleConsumer
@@ -33,57 +35,67 @@ def _mp_consume(client, group, topic, queue, size, events, **consumer_options):
functionality breaks unless this function is kept outside of a class
"""
- # Make the child processes open separate socket connections
- client.reinit()
+ # Initial interval for retries in seconds.
+ interval = 1
+ while not events.exit.is_set():
+ try:
+ # Make the child processes open separate socket connections
+ client.reinit()
- # We will start consumers without auto-commit. Auto-commit will be
- # done by the master controller process.
- consumer = SimpleConsumer(client, group, topic,
- auto_commit=False,
- auto_commit_every_n=None,
- auto_commit_every_t=None,
- **consumer_options)
+ # We will start consumers without auto-commit. Auto-commit will be
+ # done by the master controller process.
+ consumer = SimpleConsumer(client, group, topic,
+ auto_commit=False,
+ auto_commit_every_n=None,
+ auto_commit_every_t=None,
+ **consumer_options)
- # Ensure that the consumer provides the partition information
- consumer.provide_partition_info()
+ # Ensure that the consumer provides the partition information
+ consumer.provide_partition_info()
- while True:
- # Wait till the controller indicates us to start consumption
- events.start.wait()
-
- # If we are asked to quit, do so
- if events.exit.is_set():
- break
-
- # Consume messages and add them to the queue. If the controller
- # indicates a specific number of messages, follow that advice
- count = 0
-
- message = consumer.get_message()
- if message:
while True:
- try:
- queue.put(message, timeout=FULL_QUEUE_WAIT_TIME_SECONDS)
- break
- except Full:
- if events.exit.is_set(): break
+ # Wait till the controller indicates us to start consumption
+ events.start.wait()
- count += 1
-
- # We have reached the required size. The controller might have
- # more than what he needs. Wait for a while.
- # Without this logic, it is possible that we run into a big
- # loop consuming all available messages before the controller
- # can reset the 'start' event
- if count == size.value:
- events.pause.wait()
-
- else:
- # In case we did not receive any message, give up the CPU for
- # a while before we try again
- time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS)
+ # If we are asked to quit, do so
+ if events.exit.is_set():
+ break
- consumer.stop()
+ # Consume messages and add them to the queue. If the controller
+ # indicates a specific number of messages, follow that advice
+ count = 0
+
+ message = consumer.get_message()
+ if message:
+ while True:
+ try:
+ queue.put(message, timeout=FULL_QUEUE_WAIT_TIME_SECONDS)
+ break
+ except Full:
+ if events.exit.is_set(): break
+
+ count += 1
+
+ # We have reached the required size. The controller might have
+ # more than what he needs. Wait for a while.
+ # Without this logic, it is possible that we run into a big
+ # loop consuming all available messages before the controller
+ # can reset the 'start' event
+ if count == size.value:
+ events.pause.wait()
+
+ else:
+ # In case we did not receive any message, give up the CPU for
+ # a while before we try again
+ time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS)
+
+ consumer.stop()
+
+ except KafkaError as e:
+ # Retry with exponential backoff
+ log.error("Problem communicating with Kafka (%s), retrying in %d seconds..." % (e, interval))
+ time.sleep(interval)
+ interval = interval*2 if interval*2 < MAX_BACKOFF_SECONDS else MAX_BACKOFF_SECONDS
class MultiProcessConsumer(Consumer):