From fb024355f07735d148c035dfd51b279d1b8e59df Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 4 Dec 2015 17:15:49 -0800 Subject: Cleanup new producer tests... --- test/test_producer.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) (limited to 'test') diff --git a/test/test_producer.py b/test/test_producer.py index 3c026e8..31282bf 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -111,19 +111,19 @@ class TestKafkaProducer(unittest.TestCase): with self.assertRaises(FailedPayloadsError): producer.send_messages('foobar', b'test message') - def test_cleanup_stop_is_called_on_not_stopped_object(self): + def test_cleanup_is_not_called_on_stopped_producer(self): producer = Producer(MagicMock(), async=True) producer.stopped = True - with patch('kafka.producer.base.Producer.stop') as base_stop: + with patch.object(producer, 'stop') as mocked_stop: producer._cleanup_func(producer) - self.assertEqual(base_stop.call_count, 0) + self.assertEqual(mocked_stop.call_count, 0) - def test_cleanup_stop_is_not_called_on_stopped_object(self): + def test_cleanup_is_called_on_running_producer(self): producer = Producer(MagicMock(), async=True) producer.stopped = False - with patch('kafka.producer.base.Producer.stop') as base_stop: + with patch.object(producer, 'stop') as mocked_stop: producer._cleanup_func(producer) - self.assertEqual(base_stop.call_count, 1) + self.assertEqual(mocked_stop.call_count, 1) class TestKafkaProducerSendUpstream(unittest.TestCase): -- cgit v1.2.1 From b687b4c5d4788d64efe9b7bcfb776e57d6fbcc8e Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sat, 5 Dec 2015 03:36:12 -0800 Subject: Use producer.stop() to flush messages in async producer test --- test/test_producer_integration.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) (limited to 'test') diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py index 46b6851..c99ed63 100644 --- a/test/test_producer_integration.py +++ b/test/test_producer_integration.py @@ -204,13 +204,11 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): resp = producer.send_messages(self.topic, self.msg("one")) self.assertEqual(len(resp), 0) - # wait for the server to report a new highwatermark - while self.current_offset(self.topic, partition) == start_offset: - time.sleep(0.1) + # flush messages + producer.stop() self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ]) - producer.stop() @kafka_versions("all") def test_batched_simple_producer__triggers_by_message(self): -- cgit v1.2.1