summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorZack Dever <zack.dever@rd.io>2015-12-07 13:37:30 -0800
committerZack Dever <zack.dever@rd.io>2015-12-07 13:37:30 -0800
commit753d8dca136178a4c2ecb0cda8d4ec371805455f (patch)
tree83225fb95731551cbb9c5a5aeb6fb08a3ec9f0ad /test
parentefc3d4f466c0d6630c9fff09fb1b90035c5351d7 (diff)
parenta678260d3622a0decd2d123ac0cfc445084eed60 (diff)
downloadkafka-python-753d8dca136178a4c2ecb0cda8d4ec371805455f.tar.gz
Merge branch 'master' into 0.9
Diffstat (limited to 'test')
-rw-r--r--test/test_producer.py12
-rw-r--r--test/test_producer_integration.py6
2 files changed, 8 insertions, 10 deletions
diff --git a/test/test_producer.py b/test/test_producer.py
index 3c026e8..31282bf 100644
--- a/test/test_producer.py
+++ b/test/test_producer.py
@@ -111,19 +111,19 @@ class TestKafkaProducer(unittest.TestCase):
with self.assertRaises(FailedPayloadsError):
producer.send_messages('foobar', b'test message')
- def test_cleanup_stop_is_called_on_not_stopped_object(self):
+ def test_cleanup_is_not_called_on_stopped_producer(self):
producer = Producer(MagicMock(), async=True)
producer.stopped = True
- with patch('kafka.producer.base.Producer.stop') as base_stop:
+ with patch.object(producer, 'stop') as mocked_stop:
producer._cleanup_func(producer)
- self.assertEqual(base_stop.call_count, 0)
+ self.assertEqual(mocked_stop.call_count, 0)
- def test_cleanup_stop_is_not_called_on_stopped_object(self):
+ def test_cleanup_is_called_on_running_producer(self):
producer = Producer(MagicMock(), async=True)
producer.stopped = False
- with patch('kafka.producer.base.Producer.stop') as base_stop:
+ with patch.object(producer, 'stop') as mocked_stop:
producer._cleanup_func(producer)
- self.assertEqual(base_stop.call_count, 1)
+ self.assertEqual(mocked_stop.call_count, 1)
class TestKafkaProducerSendUpstream(unittest.TestCase):
diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py
index 46b6851..c99ed63 100644
--- a/test/test_producer_integration.py
+++ b/test/test_producer_integration.py
@@ -204,13 +204,11 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
resp = producer.send_messages(self.topic, self.msg("one"))
self.assertEqual(len(resp), 0)
- # wait for the server to report a new highwatermark
- while self.current_offset(self.topic, partition) == start_offset:
- time.sleep(0.1)
+ # flush messages
+ producer.stop()
self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ])
- producer.stop()
@kafka_versions("all")
def test_batched_simple_producer__triggers_by_message(self):