diff options
author | Mehdi Abaakouk <sileht@redhat.com> | 2015-07-31 09:00:20 +0200 |
---|---|---|
committer | Mehdi Abaakouk <sileht@redhat.com> | 2015-08-06 11:53:11 +0200 |
commit | fb0601a90d4314af7b722fa076fc6b90664b7676 (patch) | |
tree | 49c6285e26ff0701aea7698d9c049cdce31f2cd3 /ceilometer/tests/unit/publisher/test_messaging_publisher.py | |
parent | 49f53f35a5b7d616517b23e75b18e159ece41d4a (diff) | |
download | ceilometer-fb0601a90d4314af7b722fa076fc6b90664b7676.tar.gz |
Fixes the kafka publisher
The kafka publusher is not concurrency safe at all.
And the sample/event payload cannot be serialized correctly
To fix that:
* the code now is shared with the messaging one.
* the connection to kafka is done before sending messaging to not touch
the queue
* use jsonutils to serialize samples
Change-Id: I3fb731d2eb33cbfba38c5165ce9874af89072e34
Closes-bug: #1479976
Diffstat (limited to 'ceilometer/tests/unit/publisher/test_messaging_publisher.py')
-rw-r--r-- | ceilometer/tests/unit/publisher/test_messaging_publisher.py | 23 |
1 files changed, 11 insertions, 12 deletions
diff --git a/ceilometer/tests/unit/publisher/test_messaging_publisher.py b/ceilometer/tests/unit/publisher/test_messaging_publisher.py index 23420307..3bd26487 100644 --- a/ceilometer/tests/unit/publisher/test_messaging_publisher.py +++ b/ceilometer/tests/unit/publisher/test_messaging_publisher.py @@ -21,7 +21,6 @@ import eventlet import mock from oslo_config import fixture as fixture_config from oslo_context import context -import oslo_messaging from oslo_utils import netutils import testscenarios.testcase @@ -250,11 +249,11 @@ class TestPublisherPolicy(TestPublisher): def test_published_with_no_policy(self, mylog): publisher = self.publisher_cls( netutils.urlsplit('%s://' % self.protocol)) - side_effect = oslo_messaging.MessageDeliveryFailure() + side_effect = msg_publisher.DeliveryFailure() with mock.patch.object(publisher, '_send') as fake_send: fake_send.side_effect = side_effect self.assertRaises( - oslo_messaging.MessageDeliveryFailure, + msg_publisher.DeliveryFailure, getattr(publisher, self.pub_func), mock.MagicMock(), self.test_data) self.assertTrue(mylog.info.called) @@ -267,11 +266,11 @@ class TestPublisherPolicy(TestPublisher): def test_published_with_policy_block(self, mylog): publisher = self.publisher_cls( netutils.urlsplit('%s://?policy=default' % self.protocol)) - side_effect = oslo_messaging.MessageDeliveryFailure() + side_effect = msg_publisher.DeliveryFailure() with mock.patch.object(publisher, '_send') as fake_send: fake_send.side_effect = side_effect self.assertRaises( - oslo_messaging.MessageDeliveryFailure, + msg_publisher.DeliveryFailure, getattr(publisher, self.pub_func), mock.MagicMock(), self.test_data) self.assertTrue(mylog.info.called) @@ -283,11 +282,11 @@ class TestPublisherPolicy(TestPublisher): def test_published_with_policy_incorrect(self, mylog): publisher = self.publisher_cls( netutils.urlsplit('%s://?policy=notexist' % self.protocol)) - side_effect = oslo_messaging.MessageDeliveryFailure() + side_effect = msg_publisher.DeliveryFailure() with mock.patch.object(publisher, '_send') as fake_send: fake_send.side_effect = side_effect self.assertRaises( - oslo_messaging.MessageDeliveryFailure, + msg_publisher.DeliveryFailure, getattr(publisher, self.pub_func), mock.MagicMock(), self.test_data) self.assertTrue(mylog.warn.called) @@ -303,7 +302,7 @@ class TestPublisherPolicyReactions(TestPublisher): def test_published_with_policy_drop_and_rpc_down(self): publisher = self.publisher_cls( netutils.urlsplit('%s://?policy=drop' % self.protocol)) - side_effect = oslo_messaging.MessageDeliveryFailure() + side_effect = msg_publisher.DeliveryFailure() with mock.patch.object(publisher, '_send') as fake_send: fake_send.side_effect = side_effect getattr(publisher, self.pub_func)(mock.MagicMock(), @@ -315,7 +314,7 @@ class TestPublisherPolicyReactions(TestPublisher): def test_published_with_policy_queue_and_rpc_down(self): publisher = self.publisher_cls( netutils.urlsplit('%s://?policy=queue' % self.protocol)) - side_effect = oslo_messaging.MessageDeliveryFailure() + side_effect = msg_publisher.DeliveryFailure() with mock.patch.object(publisher, '_send') as fake_send: fake_send.side_effect = side_effect @@ -330,7 +329,7 @@ class TestPublisherPolicyReactions(TestPublisher): publisher = self.publisher_cls( netutils.urlsplit('%s://?policy=queue' % self.protocol)) - side_effect = oslo_messaging.MessageDeliveryFailure() + side_effect = msg_publisher.DeliveryFailure() with mock.patch.object(publisher, '_send') as fake_send: fake_send.side_effect = side_effect getattr(publisher, self.pub_func)(mock.MagicMock(), @@ -354,7 +353,7 @@ class TestPublisherPolicyReactions(TestPublisher): publisher = self.publisher_cls(netutils.urlsplit( '%s://?policy=queue&max_queue_length=3' % self.protocol)) - side_effect = oslo_messaging.MessageDeliveryFailure() + side_effect = msg_publisher.DeliveryFailure() with mock.patch.object(publisher, '_send') as fake_send: fake_send.side_effect = side_effect for i in range(0, 5): @@ -381,7 +380,7 @@ class TestPublisherPolicyReactions(TestPublisher): publisher = self.publisher_cls( netutils.urlsplit('%s://?policy=queue' % self.protocol)) - side_effect = oslo_messaging.MessageDeliveryFailure() + side_effect = msg_publisher.DeliveryFailure() with mock.patch.object(publisher, '_send') as fake_send: fake_send.side_effect = side_effect for i in range(0, 2000): |