From fb118fb75c818a32d0bb81fe725faca0a714b580 Mon Sep 17 00:00:00 2001 From: Viktor Shlapakov Date: Wed, 25 Mar 2015 14:35:23 +0300 Subject: Manageable queue.put() operation for MPConsumer processes --- kafka/consumer/multiprocess.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) (limited to 'kafka/consumer/multiprocess.py') diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py index a63b090..5ce8b4d 100644 --- a/kafka/consumer/multiprocess.py +++ b/kafka/consumer/multiprocess.py @@ -7,13 +7,14 @@ from collections import namedtuple from multiprocessing import Process, Manager as MPManager try: - from Queue import Empty + from Queue import Empty, Full except ImportError: # python 2 - from queue import Empty + from queue import Empty, Full from .base import ( AUTO_COMMIT_MSG_COUNT, AUTO_COMMIT_INTERVAL, - NO_MESSAGES_WAIT_TIME_SECONDS + NO_MESSAGES_WAIT_TIME_SECONDS, + FULL_QUEUE_WAIT_TIME_SECONDS ) from .simple import Consumer, SimpleConsumer @@ -59,7 +60,13 @@ def _mp_consume(client, group, topic, queue, size, events, **consumer_options): message = consumer.get_message() if message: - queue.put(message) + while True: + try: + queue.put(message, timeout=FULL_QUEUE_WAIT_TIME_SECONDS) + break + except Full: + if events.exit.is_set(): break + count += 1 # We have reached the required size. The controller might have -- cgit v1.2.1