summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/client.py28
-rw-r--r--kafka/producer.py7
-rw-r--r--test/test_producer_integration.py9
-rw-r--r--test/testutil.py13
4 files changed, 34 insertions, 23 deletions
diff --git a/kafka/client.py b/kafka/client.py
index d0e07d0..9cb4b48 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -1,11 +1,11 @@
+import collections
import copy
+import functools
+import itertools
import logging
-import collections
-
+import time
import kafka.common
-from functools import partial
-from itertools import count
from kafka.common import (TopicAndPartition,
ConnectionError, FailedPayloadsError,
PartitionUnavailableError,
@@ -21,7 +21,7 @@ log = logging.getLogger("kafka")
class KafkaClient(object):
CLIENT_ID = "kafka-python"
- ID_GEN = count()
+ ID_GEN = itertools.count()
# NOTE: The timeout given to the client should always be greater than the
# one passed to SimpleConsumer.get_message(), otherwise you can get a
@@ -213,6 +213,16 @@ class KafkaClient(object):
def has_metadata_for_topic(self, topic):
return topic in self.topic_partitions
+ def ensure_topic_exists(self, topic, timeout = 30):
+ start_time = time.time()
+
+ self.load_metadata_for_topics(topic)
+ while not self.has_metadata_for_topic(topic):
+ if time.time() > start_time + timeout:
+ raise KafkaTimeoutError("Unable to create topic {}".format(topic))
+ self.load_metadata_for_topics(topic)
+ time.sleep(.5)
+
def close(self):
for conn in self.conns.values():
conn.close()
@@ -289,7 +299,7 @@ class KafkaClient(object):
order of input payloads
"""
- encoder = partial(
+ encoder = functools.partial(
KafkaProtocol.encode_produce_request,
acks=acks,
timeout=timeout)
@@ -321,7 +331,7 @@ class KafkaClient(object):
to the same brokers.
"""
- encoder = partial(KafkaProtocol.encode_fetch_request,
+ encoder = functools.partial(KafkaProtocol.encode_fetch_request,
max_wait_time=max_wait_time,
min_bytes=min_bytes)
@@ -359,7 +369,7 @@ class KafkaClient(object):
def send_offset_commit_request(self, group, payloads=[],
fail_on_error=True, callback=None):
- encoder = partial(KafkaProtocol.encode_offset_commit_request,
+ encoder = functools.partial(KafkaProtocol.encode_offset_commit_request,
group=group)
decoder = KafkaProtocol.decode_offset_commit_response
resps = self._send_broker_aware_request(payloads, encoder, decoder)
@@ -378,7 +388,7 @@ class KafkaClient(object):
def send_offset_fetch_request(self, group, payloads=[],
fail_on_error=True, callback=None):
- encoder = partial(KafkaProtocol.encode_offset_fetch_request,
+ encoder = functools.partial(KafkaProtocol.encode_offset_fetch_request,
group=group)
decoder = KafkaProtocol.decode_offset_fetch_response
resps = self._send_broker_aware_request(payloads, encoder, decoder)
diff --git a/kafka/producer.py b/kafka/producer.py
index 8e40be5..95c75c4 100644
--- a/kafka/producer.py
+++ b/kafka/producer.py
@@ -10,7 +10,7 @@ from itertools import cycle
from multiprocessing import Queue, Process
from kafka.common import (
- ProduceRequest, TopicAndPartition, UnsupportedCodecError
+ ProduceRequest, TopicAndPartition, UnsupportedCodecError, UnknownTopicOrPartitionError
)
from kafka.partitioner import HashedPartitioner
from kafka.protocol import CODEC_NONE, ALL_CODECS, create_message_set
@@ -216,7 +216,10 @@ class SimpleProducer(Producer):
if topic not in self.partition_cycles:
if topic not in self.client.topic_partitions:
self.client.load_metadata_for_topics(topic)
- self.partition_cycles[topic] = cycle(self.client.topic_partitions[topic])
+ try:
+ self.partition_cycles[topic] = cycle(self.client.topic_partitions[topic])
+ except KeyError:
+ raise UnknownTopicOrPartitionError(topic)
# Randomize the initial partition that is returned
if self.random_start:
diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py
index c69e117..0718cb3 100644
--- a/test/test_producer_integration.py
+++ b/test/test_producer_integration.py
@@ -143,6 +143,15 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
producer.stop()
@kafka_versions("all")
+ def test_produce__new_topic_fails_with_reasonable_error(self):
+ new_topic = 'new_topic_{}'.format(str(uuid.uuid4()))
+ producer = SimpleProducer(self.client)
+
+ # At first it doesn't exist
+ with self.assertRaises(UnknownTopicOrPartitionError):
+ resp = producer.send_messages(new_topic, self.msg("one"))
+
+ @kafka_versions("all")
def test_producer_random_order(self):
producer = SimpleProducer(self.client, random_start = True)
resp1 = producer.send_messages(self.topic, self.msg("one"), self.msg("two"))
diff --git a/test/testutil.py b/test/testutil.py
index 78e6f7d..4f5f6ee 100644
--- a/test/testutil.py
+++ b/test/testutil.py
@@ -13,7 +13,6 @@ from kafka import KafkaClient
__all__ = [
'random_string',
- 'ensure_topic_creation',
'get_open_port',
'kafka_versions',
'KafkaIntegrationTestCase',
@@ -39,16 +38,6 @@ def kafka_versions(*versions):
return wrapper
return kafka_versions
-def ensure_topic_creation(client, topic_name, timeout = 30):
- start_time = time.time()
-
- client.load_metadata_for_topics(topic_name)
- while not client.has_metadata_for_topic(topic_name):
- if time.time() > start_time + timeout:
- raise Exception("Unable to create topic %s" % topic_name)
- client.load_metadata_for_topics(topic_name)
- time.sleep(1)
-
def get_open_port():
sock = socket.socket()
sock.bind(("", 0))
@@ -71,7 +60,7 @@ class KafkaIntegrationTestCase(unittest2.TestCase):
if self.create_client:
self.client = KafkaClient('%s:%d' % (self.server.host, self.server.port))
- ensure_topic_creation(self.client, self.topic)
+ self.client.ensure_topic_exists(self.topic)
self._messages = {}