diff options
author | Dana Powers <dana.powers@gmail.com> | 2018-03-23 05:58:55 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-03-23 05:58:55 -0700 |
commit | b62006aeb86258b4b1ef2735bebb1fe99459b82d (patch) | |
tree | 9a5cab083163b8e5d952d1bb0f3bb7141ffb746a /test | |
parent | 204388b0928c02a339eb84b376c74851eb074e69 (diff) | |
download | kafka-python-b62006aeb86258b4b1ef2735bebb1fe99459b82d.tar.gz |
Change SimpleProducer to use async_send (async is reserved in py37) (#1454)
Diffstat (limited to 'test')
-rw-r--r-- | test/test_failover_integration.py | 8 | ||||
-rw-r--r-- | test/test_producer_integration.py | 8 | ||||
-rw-r--r-- | test/test_producer_legacy.py | 10 |
3 files changed, 13 insertions, 13 deletions
diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py index 8531cfb..797e1c8 100644 --- a/test/test_failover_integration.py +++ b/test/test_failover_integration.py @@ -60,7 +60,7 @@ class TestFailover(KafkaIntegrationTestCase): # require that the server commit messages to all in-sync replicas # so that failover doesn't lose any messages on server-side # and we can assert that server-side message count equals client-side - producer = Producer(self.client, async=False, + producer = Producer(self.client, async_send=False, req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT) # Send 100 random messages to a specific partition @@ -101,7 +101,7 @@ class TestFailover(KafkaIntegrationTestCase): partition = 0 # Test the base class Producer -- send_messages to a specific partition - producer = Producer(self.client, async=True, + producer = Producer(self.client, async_send=True, batch_send_every_n=15, batch_send_every_t=3, req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT, @@ -146,7 +146,7 @@ class TestFailover(KafkaIntegrationTestCase): def test_switch_leader_keyed_producer(self): topic = self.topic - producer = KeyedProducer(self.client, async=False) + producer = KeyedProducer(self.client, async_send=False) # Send 10 random messages for _ in range(10): @@ -182,7 +182,7 @@ class TestFailover(KafkaIntegrationTestCase): producer.send_messages(topic, key, msg) def test_switch_leader_simple_consumer(self): - producer = Producer(self.client, async=False) + producer = Producer(self.client, async_send=False) consumer = SimpleConsumer(self.client, None, self.topic, partitions=None, auto_commit=False, iter_timeout=10) self._send_random_messages(producer, self.topic, 0, 2) consumer.get_messages() diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py index 6cd3d13..2b81047 100644 --- a/test/test_producer_integration.py +++ b/test/test_producer_integration.py @@ -216,7 +216,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): partition = self.client.get_partition_ids_for_topic(self.topic)[0] start_offset = self.current_offset(self.topic, partition) - producer = SimpleProducer(self.client, async=True, random_start=False) + producer = SimpleProducer(self.client, async_send=True, random_start=False) resp = producer.send_messages(self.topic, self.msg("one")) self.assertEqual(len(resp), 0) @@ -235,7 +235,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): batch_interval = 5 producer = SimpleProducer( self.client, - async=True, + async_send=True, batch_send_every_n=batch_messages, batch_send_every_t=batch_interval, random_start=False) @@ -300,7 +300,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): batch_interval = 5 producer = SimpleProducer( self.client, - async=True, + async_send=True, batch_send_every_n=100, batch_send_every_t=batch_interval, random_start=False) @@ -432,7 +432,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): producer = KeyedProducer(self.client, partitioner=RoundRobinPartitioner, - async=True, + async_send=True, batch_send_every_t=1) resp = producer.send_messages(self.topic, self.key("key1"), self.msg("one")) diff --git a/test/test_producer_legacy.py b/test/test_producer_legacy.py index 9b87c76..6d00116 100644 --- a/test/test_producer_legacy.py +++ b/test/test_producer_legacy.py @@ -73,7 +73,7 @@ class TestKafkaProducer(unittest.TestCase): @patch('kafka.producer.base._send_upstream') def test_producer_async_queue_overfilled(self, mock): queue_size = 2 - producer = Producer(MagicMock(), async=True, + producer = Producer(MagicMock(), async_send=True, async_queue_maxsize=queue_size) topic = b'test-topic' @@ -95,25 +95,25 @@ class TestKafkaProducer(unittest.TestCase): with patch.object(SimpleClient, '_send_broker_aware_request', return_value = [error]): client = SimpleClient(MagicMock()) - producer = SimpleProducer(client, async=False, sync_fail_on_error=False) + producer = SimpleProducer(client, async_send=False, sync_fail_on_error=False) # This should not raise (response,) = producer.send_messages('foobar', b'test message') self.assertEqual(response, error) - producer = SimpleProducer(client, async=False, sync_fail_on_error=True) + producer = SimpleProducer(client, async_send=False, sync_fail_on_error=True) with self.assertRaises(FailedPayloadsError): producer.send_messages('foobar', b'test message') def test_cleanup_is_not_called_on_stopped_producer(self): - producer = Producer(MagicMock(), async=True) + producer = Producer(MagicMock(), async_send=True) producer.stopped = True with patch.object(producer, 'stop') as mocked_stop: producer._cleanup_func(producer) self.assertEqual(mocked_stop.call_count, 0) def test_cleanup_is_called_on_running_producer(self): - producer = Producer(MagicMock(), async=True) + producer = Producer(MagicMock(), async_send=True) producer.stopped = False with patch.object(producer, 'stop') as mocked_stop: producer._cleanup_func(producer) |