summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/usage.rst4
-rw-r--r--kafka/producer/keyed.py11
-rw-r--r--test/test_producer_integration.py20
3 files changed, 18 insertions, 17 deletions
diff --git a/docs/usage.rst b/docs/usage.rst
index 150d121..3e5f434 100644
--- a/docs/usage.rst
+++ b/docs/usage.rst
@@ -63,8 +63,8 @@ Keyed messages
# HashedPartitioner is default
producer = KeyedProducer(kafka)
- producer.send("my-topic", "key1", "some message")
- producer.send("my-topic", "key2", "this methode")
+ producer.send_messages("my-topic", "key1", "some message")
+ producer.send_messages("my-topic", "key2", "this methode")
producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner)
diff --git a/kafka/producer/keyed.py b/kafka/producer/keyed.py
index 333b6c0..bc42803 100644
--- a/kafka/producer/keyed.py
+++ b/kafka/producer/keyed.py
@@ -1,6 +1,7 @@
from __future__ import absolute_import
import logging
+import warnings
from kafka.partitioner import HashedPartitioner
from kafka.util import kafka_bytestring
@@ -58,15 +59,15 @@ class KeyedProducer(Producer):
partitioner = self.partitioners[topic]
return partitioner.partition(key)
- def send_messages(self,topic,key,*msg):
+ def send_messages(self, topic, key, *msg):
topic = kafka_bytestring(topic)
partition = self._next_partition(topic, key)
- return self._send_messages(topic, partition, *msg,key=key)
+ return self._send_messages(topic, partition, *msg, key=key)
+ # DEPRECATED
def send(self, topic, key, msg):
- topic = kafka_bytestring(topic)
- partition = self._next_partition(topic, key)
- return self._send_messages(topic, partition, msg, key=key)
+ warnings.warn("KeyedProducer.send is deprecated in favor of send_messages", DeprecationWarning)
+ return self.send_messages(topic, key, msg)
def __repr__(self):
return '<KeyedProducer batch=%s>' % self.async
diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py
index e3f7767..c81716d 100644
--- a/test/test_producer_integration.py
+++ b/test/test_producer_integration.py
@@ -332,10 +332,10 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
start_offsets = [self.current_offset(self.topic, p) for p in partitions]
producer = KeyedProducer(self.client, partitioner=RoundRobinPartitioner)
- resp1 = producer.send(self.topic, self.key("key1"), self.msg("one"))
- resp2 = producer.send(self.topic, self.key("key2"), self.msg("two"))
- resp3 = producer.send(self.topic, self.key("key3"), self.msg("three"))
- resp4 = producer.send(self.topic, self.key("key4"), self.msg("four"))
+ resp1 = producer.send_messages(self.topic, self.key("key1"), self.msg("one"))
+ resp2 = producer.send_messages(self.topic, self.key("key2"), self.msg("two"))
+ resp3 = producer.send_messages(self.topic, self.key("key3"), self.msg("three"))
+ resp4 = producer.send_messages(self.topic, self.key("key4"), self.msg("four"))
self.assert_produce_response(resp1, start_offsets[0]+0)
self.assert_produce_response(resp2, start_offsets[1]+0)
@@ -353,11 +353,11 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
start_offsets = [self.current_offset(self.topic, p) for p in partitions]
producer = KeyedProducer(self.client, partitioner=HashedPartitioner)
- resp1 = producer.send(self.topic, self.key("1"), self.msg("one"))
- resp2 = producer.send(self.topic, self.key("2"), self.msg("two"))
- resp3 = producer.send(self.topic, self.key("3"), self.msg("three"))
- resp4 = producer.send(self.topic, self.key("3"), self.msg("four"))
- resp5 = producer.send(self.topic, self.key("4"), self.msg("five"))
+ resp1 = producer.send_messages(self.topic, self.key("1"), self.msg("one"))
+ resp2 = producer.send_messages(self.topic, self.key("2"), self.msg("two"))
+ resp3 = producer.send_messages(self.topic, self.key("3"), self.msg("three"))
+ resp4 = producer.send_messages(self.topic, self.key("3"), self.msg("four"))
+ resp5 = producer.send_messages(self.topic, self.key("4"), self.msg("five"))
offsets = {partitions[0]: start_offsets[0], partitions[1]: start_offsets[1]}
messages = {partitions[0]: [], partitions[1]: []}
@@ -386,7 +386,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
producer = KeyedProducer(self.client, partitioner = RoundRobinPartitioner, async=True)
- resp = producer.send(self.topic, self.key("key1"), self.msg("one"))
+ resp = producer.send_messages(self.topic, self.key("key1"), self.msg("one"))
self.assertEqual(len(resp), 0)
# wait for the server to report a new highwatermark