summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2018-03-23 05:58:55 -0700
committerGitHub <noreply@github.com>2018-03-23 05:58:55 -0700
commitb62006aeb86258b4b1ef2735bebb1fe99459b82d (patch)
tree9a5cab083163b8e5d952d1bb0f3bb7141ffb746a
parent204388b0928c02a339eb84b376c74851eb074e69 (diff)
downloadkafka-python-b62006aeb86258b4b1ef2735bebb1fe99459b82d.tar.gz
Change SimpleProducer to use async_send (async is reserved in py37) (#1454)
-rw-r--r--docs/simple.rst8
-rw-r--r--kafka/producer/base.py38
-rw-r--r--kafka/producer/keyed.py2
-rw-r--r--kafka/producer/simple.py2
-rw-r--r--test/test_failover_integration.py8
-rw-r--r--test/test_producer_integration.py8
-rw-r--r--test/test_producer_legacy.py10
7 files changed, 42 insertions, 34 deletions
diff --git a/docs/simple.rst b/docs/simple.rst
index 8192a8b..afdb975 100644
--- a/docs/simple.rst
+++ b/docs/simple.rst
@@ -49,7 +49,7 @@ Asynchronous Mode
# To send messages asynchronously
client = SimpleClient('localhost:9092')
- producer = SimpleProducer(client, async=True)
+ producer = SimpleProducer(client, async_send=True)
producer.send_messages('my-topic', b'async message')
# To send messages in batch. You can use any of the available
@@ -60,7 +60,7 @@ Asynchronous Mode
# * If the producer dies before the messages are sent, there will be losses
# * Call producer.stop() to send the messages and cleanup
producer = SimpleProducer(client,
- async=True,
+ async_send=True,
batch_send_every_n=20,
batch_send_every_t=60)
@@ -73,7 +73,7 @@ Synchronous Mode
# To send messages synchronously
client = SimpleClient('localhost:9092')
- producer = SimpleProducer(client, async=False)
+ producer = SimpleProducer(client, async_send=False)
# Note that the application is responsible for encoding messages to type bytes
producer.send_messages('my-topic', b'some message')
@@ -88,7 +88,7 @@ Synchronous Mode
# ACK_AFTER_CLUSTER_COMMIT : server will block until the message is committed
# by all in sync replicas before sending a response
producer = SimpleProducer(client,
- async=False,
+ async_send=False,
req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE,
ack_timeout=2000,
sync_fail_on_error=False)
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index c038bd3..e8d6c3d 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -226,7 +226,7 @@ class Producer(object):
Arguments:
client (kafka.SimpleClient): instance to use for broker
- communications. If async=True, the background thread will use
+ communications. If async_send=True, the background thread will use
:meth:`client.copy`, which is expected to return a thread-safe
object.
codec (kafka.protocol.ALL_CODECS): compression codec to use.
@@ -238,11 +238,11 @@ class Producer(object):
sync_fail_on_error (bool, optional): whether sync producer should
raise exceptions (True), or just return errors (False),
defaults to True.
- async (bool, optional): send message using a background thread,
+ async_send (bool, optional): send message using a background thread,
defaults to False.
- batch_send_every_n (int, optional): If async is True, messages are
+ batch_send_every_n (int, optional): If async_send is True, messages are
sent in batches of this size, defaults to 20.
- batch_send_every_t (int or float, optional): If async is True,
+ batch_send_every_t (int or float, optional): If async_send is True,
messages are sent immediately after this timeout in seconds, even
if there are fewer than batch_send_every_n, defaults to 20.
async_retry_limit (int, optional): number of retries for failed messages
@@ -268,8 +268,10 @@ class Producer(object):
defaults to 30.
Deprecated Arguments:
+ async (bool, optional): send message using a background thread,
+ defaults to False. Deprecated, use 'async_send'
batch_send (bool, optional): If True, messages are sent by a background
- thread in batches, defaults to False. Deprecated, use 'async'
+ thread in batches, defaults to False. Deprecated, use 'async_send'
"""
ACK_NOT_REQUIRED = 0 # No ack is required
ACK_AFTER_LOCAL_WRITE = 1 # Send response after it is written to log
@@ -282,8 +284,8 @@ class Producer(object):
codec=None,
codec_compresslevel=None,
sync_fail_on_error=SYNC_FAIL_ON_ERROR_DEFAULT,
- async=False,
- batch_send=False, # deprecated, use async
+ async_send=False,
+ batch_send=False, # deprecated, use async_send
batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
async_retry_limit=ASYNC_RETRY_LIMIT,
@@ -292,15 +294,21 @@ class Producer(object):
async_queue_maxsize=ASYNC_QUEUE_MAXSIZE,
async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT,
async_log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR,
- async_stop_timeout=ASYNC_STOP_TIMEOUT_SECS):
+ async_stop_timeout=ASYNC_STOP_TIMEOUT_SECS,
+ **kwargs):
+
+ # async renamed async_send for python3.7 support
+ if 'async' in kwargs:
+ log.warning('Deprecated async option found -- use async_send')
+ async_send = kwargs['async']
- if async:
+ if async_send:
assert batch_send_every_n > 0
assert batch_send_every_t > 0
assert async_queue_maxsize >= 0
self.client = client
- self.async = async
+ self.async_send = async_send
self.req_acks = req_acks
self.ack_timeout = ack_timeout
self.stopped = False
@@ -313,7 +321,7 @@ class Producer(object):
self.codec = codec
self.codec_compresslevel = codec_compresslevel
- if self.async:
+ if self.async_send:
# Messages are sent through this queue
self.queue = Queue(async_queue_maxsize)
self.async_queue_put_timeout = async_queue_put_timeout
@@ -400,7 +408,7 @@ class Producer(object):
if key is not None and not isinstance(key, six.binary_type):
raise TypeError("the key must be type bytes")
- if self.async:
+ if self.async_send:
for idx, m in enumerate(msg):
try:
item = (TopicPartition(topic, partition), m, key)
@@ -435,7 +443,7 @@ class Producer(object):
log.warning('timeout argument to stop() is deprecated - '
'it will be removed in future release')
- if not self.async:
+ if not self.async_send:
log.warning('producer.stop() called, but producer is not async')
return
@@ -443,7 +451,7 @@ class Producer(object):
log.warning('producer.stop() called, but producer is already stopped')
return
- if self.async:
+ if self.async_send:
self.queue.put((STOP_ASYNC_PRODUCER, None, None))
self.thread_stop_event.set()
self.thread.join()
@@ -471,5 +479,5 @@ class Producer(object):
self.stopped = True
def __del__(self):
- if self.async and not self.stopped:
+ if self.async_send and not self.stopped:
self.stop()
diff --git a/kafka/producer/keyed.py b/kafka/producer/keyed.py
index 8de3ad8..62bb733 100644
--- a/kafka/producer/keyed.py
+++ b/kafka/producer/keyed.py
@@ -46,4 +46,4 @@ class KeyedProducer(Producer):
return self.send_messages(topic, key, msg)
def __repr__(self):
- return '<KeyedProducer batch=%s>' % self.async
+ return '<KeyedProducer batch=%s>' % self.async_send
diff --git a/kafka/producer/simple.py b/kafka/producer/simple.py
index 589363c..91e0abc 100644
--- a/kafka/producer/simple.py
+++ b/kafka/producer/simple.py
@@ -51,4 +51,4 @@ class SimpleProducer(Producer):
)
def __repr__(self):
- return '<SimpleProducer batch=%s>' % self.async
+ return '<SimpleProducer batch=%s>' % self.async_send
diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py
index 8531cfb..797e1c8 100644
--- a/test/test_failover_integration.py
+++ b/test/test_failover_integration.py
@@ -60,7 +60,7 @@ class TestFailover(KafkaIntegrationTestCase):
# require that the server commit messages to all in-sync replicas
# so that failover doesn't lose any messages on server-side
# and we can assert that server-side message count equals client-side
- producer = Producer(self.client, async=False,
+ producer = Producer(self.client, async_send=False,
req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT)
# Send 100 random messages to a specific partition
@@ -101,7 +101,7 @@ class TestFailover(KafkaIntegrationTestCase):
partition = 0
# Test the base class Producer -- send_messages to a specific partition
- producer = Producer(self.client, async=True,
+ producer = Producer(self.client, async_send=True,
batch_send_every_n=15,
batch_send_every_t=3,
req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT,
@@ -146,7 +146,7 @@ class TestFailover(KafkaIntegrationTestCase):
def test_switch_leader_keyed_producer(self):
topic = self.topic
- producer = KeyedProducer(self.client, async=False)
+ producer = KeyedProducer(self.client, async_send=False)
# Send 10 random messages
for _ in range(10):
@@ -182,7 +182,7 @@ class TestFailover(KafkaIntegrationTestCase):
producer.send_messages(topic, key, msg)
def test_switch_leader_simple_consumer(self):
- producer = Producer(self.client, async=False)
+ producer = Producer(self.client, async_send=False)
consumer = SimpleConsumer(self.client, None, self.topic, partitions=None, auto_commit=False, iter_timeout=10)
self._send_random_messages(producer, self.topic, 0, 2)
consumer.get_messages()
diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py
index 6cd3d13..2b81047 100644
--- a/test/test_producer_integration.py
+++ b/test/test_producer_integration.py
@@ -216,7 +216,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
partition = self.client.get_partition_ids_for_topic(self.topic)[0]
start_offset = self.current_offset(self.topic, partition)
- producer = SimpleProducer(self.client, async=True, random_start=False)
+ producer = SimpleProducer(self.client, async_send=True, random_start=False)
resp = producer.send_messages(self.topic, self.msg("one"))
self.assertEqual(len(resp), 0)
@@ -235,7 +235,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
batch_interval = 5
producer = SimpleProducer(
self.client,
- async=True,
+ async_send=True,
batch_send_every_n=batch_messages,
batch_send_every_t=batch_interval,
random_start=False)
@@ -300,7 +300,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
batch_interval = 5
producer = SimpleProducer(
self.client,
- async=True,
+ async_send=True,
batch_send_every_n=100,
batch_send_every_t=batch_interval,
random_start=False)
@@ -432,7 +432,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
producer = KeyedProducer(self.client,
partitioner=RoundRobinPartitioner,
- async=True,
+ async_send=True,
batch_send_every_t=1)
resp = producer.send_messages(self.topic, self.key("key1"), self.msg("one"))
diff --git a/test/test_producer_legacy.py b/test/test_producer_legacy.py
index 9b87c76..6d00116 100644
--- a/test/test_producer_legacy.py
+++ b/test/test_producer_legacy.py
@@ -73,7 +73,7 @@ class TestKafkaProducer(unittest.TestCase):
@patch('kafka.producer.base._send_upstream')
def test_producer_async_queue_overfilled(self, mock):
queue_size = 2
- producer = Producer(MagicMock(), async=True,
+ producer = Producer(MagicMock(), async_send=True,
async_queue_maxsize=queue_size)
topic = b'test-topic'
@@ -95,25 +95,25 @@ class TestKafkaProducer(unittest.TestCase):
with patch.object(SimpleClient, '_send_broker_aware_request', return_value = [error]):
client = SimpleClient(MagicMock())
- producer = SimpleProducer(client, async=False, sync_fail_on_error=False)
+ producer = SimpleProducer(client, async_send=False, sync_fail_on_error=False)
# This should not raise
(response,) = producer.send_messages('foobar', b'test message')
self.assertEqual(response, error)
- producer = SimpleProducer(client, async=False, sync_fail_on_error=True)
+ producer = SimpleProducer(client, async_send=False, sync_fail_on_error=True)
with self.assertRaises(FailedPayloadsError):
producer.send_messages('foobar', b'test message')
def test_cleanup_is_not_called_on_stopped_producer(self):
- producer = Producer(MagicMock(), async=True)
+ producer = Producer(MagicMock(), async_send=True)
producer.stopped = True
with patch.object(producer, 'stop') as mocked_stop:
producer._cleanup_func(producer)
self.assertEqual(mocked_stop.call_count, 0)
def test_cleanup_is_called_on_running_producer(self):
- producer = Producer(MagicMock(), async=True)
+ producer = Producer(MagicMock(), async_send=True)
producer.stopped = False
with patch.object(producer, 'stop') as mocked_stop:
producer._cleanup_func(producer)