diff options
author | Zack Dever <zack.dever@rd.io> | 2015-12-07 13:37:30 -0800 |
---|---|---|
committer | Zack Dever <zack.dever@rd.io> | 2015-12-07 13:37:30 -0800 |
commit | 753d8dca136178a4c2ecb0cda8d4ec371805455f (patch) | |
tree | 83225fb95731551cbb9c5a5aeb6fb08a3ec9f0ad /test | |
parent | efc3d4f466c0d6630c9fff09fb1b90035c5351d7 (diff) | |
parent | a678260d3622a0decd2d123ac0cfc445084eed60 (diff) | |
download | kafka-python-753d8dca136178a4c2ecb0cda8d4ec371805455f.tar.gz |
Merge branch 'master' into 0.9
Diffstat (limited to 'test')
-rw-r--r-- | test/test_producer.py | 12 | ||||
-rw-r--r-- | test/test_producer_integration.py | 6 |
2 files changed, 8 insertions, 10 deletions
diff --git a/test/test_producer.py b/test/test_producer.py index 3c026e8..31282bf 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -111,19 +111,19 @@ class TestKafkaProducer(unittest.TestCase): with self.assertRaises(FailedPayloadsError): producer.send_messages('foobar', b'test message') - def test_cleanup_stop_is_called_on_not_stopped_object(self): + def test_cleanup_is_not_called_on_stopped_producer(self): producer = Producer(MagicMock(), async=True) producer.stopped = True - with patch('kafka.producer.base.Producer.stop') as base_stop: + with patch.object(producer, 'stop') as mocked_stop: producer._cleanup_func(producer) - self.assertEqual(base_stop.call_count, 0) + self.assertEqual(mocked_stop.call_count, 0) - def test_cleanup_stop_is_not_called_on_stopped_object(self): + def test_cleanup_is_called_on_running_producer(self): producer = Producer(MagicMock(), async=True) producer.stopped = False - with patch('kafka.producer.base.Producer.stop') as base_stop: + with patch.object(producer, 'stop') as mocked_stop: producer._cleanup_func(producer) - self.assertEqual(base_stop.call_count, 1) + self.assertEqual(mocked_stop.call_count, 1) class TestKafkaProducerSendUpstream(unittest.TestCase): diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py index 46b6851..c99ed63 100644 --- a/test/test_producer_integration.py +++ b/test/test_producer_integration.py @@ -204,13 +204,11 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): resp = producer.send_messages(self.topic, self.msg("one")) self.assertEqual(len(resp), 0) - # wait for the server to report a new highwatermark - while self.current_offset(self.topic, partition) == start_offset: - time.sleep(0.1) + # flush messages + producer.stop() self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ]) - producer.stop() @kafka_versions("all") def test_batched_simple_producer__triggers_by_message(self): |