summaryrefslogtreecommitdiff
path: root/ceilometer/tests/unit/publisher/test_messaging_publisher.py
diff options
context:
space:
mode:
Diffstat (limited to 'ceilometer/tests/unit/publisher/test_messaging_publisher.py')
-rw-r--r--ceilometer/tests/unit/publisher/test_messaging_publisher.py47
1 files changed, 44 insertions, 3 deletions
diff --git a/ceilometer/tests/unit/publisher/test_messaging_publisher.py b/ceilometer/tests/unit/publisher/test_messaging_publisher.py
index 203a48bc..6be8ada3 100644
--- a/ceilometer/tests/unit/publisher/test_messaging_publisher.py
+++ b/ceilometer/tests/unit/publisher/test_messaging_publisher.py
@@ -18,6 +18,8 @@ import datetime
import uuid
import mock
+import oslo_messaging
+from oslo_messaging._drivers import impl_kafka as kafka_driver
from oslo_utils import netutils
import testscenarios.testcase
@@ -147,6 +149,42 @@ class NotifierOnlyPublisherTest(BasePublisherTestCase):
cgt.assert_called_with(self.CONF, 'amqp://foo:foo@127.0.0.1:1234/foo'
'?amqp_auto_delete=true')
+ @mock.patch('ceilometer.messaging.get_transport')
+ def test_publish_with_none_rabbit_driver(self, cgt):
+ sample_publisher = msg_publisher.SampleNotifierPublisher(
+ self.CONF,
+ netutils.urlsplit('notifier://127.0.0.1:9092?driver=kafka'))
+ cgt.assert_called_with(self.CONF, 'kafka://127.0.0.1:9092')
+ transport = oslo_messaging.get_transport(self.CONF,
+ 'kafka://127.0.0.1:9092')
+ self.assertIsInstance(transport._driver, kafka_driver.KafkaDriver)
+
+ side_effect = msg_publisher.DeliveryFailure()
+ with mock.patch.object(sample_publisher, '_send') as fake_send:
+ fake_send.side_effect = side_effect
+ self.assertRaises(
+ msg_publisher.DeliveryFailure,
+ sample_publisher.publish_samples,
+ self.test_sample_data)
+ self.assertEqual(0, len(sample_publisher.local_queue))
+ self.assertEqual(100, len(fake_send.mock_calls))
+ fake_send.assert_called_with('metering', mock.ANY)
+
+ event_publisher = msg_publisher.EventNotifierPublisher(
+ self.CONF,
+ netutils.urlsplit('notifier://127.0.0.1:9092?driver=kafka'))
+ cgt.assert_called_with(self.CONF, 'kafka://127.0.0.1:9092')
+
+ with mock.patch.object(event_publisher, '_send') as fake_send:
+ fake_send.side_effect = side_effect
+ self.assertRaises(
+ msg_publisher.DeliveryFailure,
+ event_publisher.publish_events,
+ self.test_event_data)
+ self.assertEqual(0, len(event_publisher.local_queue))
+ self.assertEqual(100, len(fake_send.mock_calls))
+ fake_send.assert_called_with('event', mock.ANY)
+
class TestPublisher(testscenarios.testcase.WithScenarios,
BasePublisherTestCase):
@@ -186,7 +224,8 @@ class TestPublisherPolicy(TestPublisher):
self.assertTrue(mylog.info.called)
self.assertEqual('default', publisher.policy)
self.assertEqual(0, len(publisher.local_queue))
- fake_send.assert_called_once_with(
+ self.assertEqual(100, len(fake_send.mock_calls))
+ fake_send.assert_called_with(
self.topic, mock.ANY)
@mock.patch('ceilometer.publisher.messaging.LOG')
@@ -203,7 +242,8 @@ class TestPublisherPolicy(TestPublisher):
self.test_data)
self.assertTrue(mylog.info.called)
self.assertEqual(0, len(publisher.local_queue))
- fake_send.assert_called_once_with(
+ self.assertEqual(100, len(fake_send.mock_calls))
+ fake_send.assert_called_with(
self.topic, mock.ANY)
@mock.patch('ceilometer.publisher.messaging.LOG')
@@ -221,7 +261,8 @@ class TestPublisherPolicy(TestPublisher):
self.assertTrue(mylog.warning.called)
self.assertEqual('default', publisher.policy)
self.assertEqual(0, len(publisher.local_queue))
- fake_send.assert_called_once_with(
+ self.assertEqual(100, len(fake_send.mock_calls))
+ fake_send.assert_called_with(
self.topic, mock.ANY)