summaryrefslogtreecommitdiff
path: root/ceilometer/tests/unit/publisher/test_messaging_publisher.py
diff options
context:
space:
mode:
authorMehdi Abaakouk <sileht@redhat.com>2015-07-31 09:00:20 +0200
committerMehdi Abaakouk <sileht@redhat.com>2015-08-06 11:53:11 +0200
commitfb0601a90d4314af7b722fa076fc6b90664b7676 (patch)
tree49c6285e26ff0701aea7698d9c049cdce31f2cd3 /ceilometer/tests/unit/publisher/test_messaging_publisher.py
parent49f53f35a5b7d616517b23e75b18e159ece41d4a (diff)
downloadceilometer-fb0601a90d4314af7b722fa076fc6b90664b7676.tar.gz
Fixes the kafka publisher
The kafka publusher is not concurrency safe at all. And the sample/event payload cannot be serialized correctly To fix that: * the code now is shared with the messaging one. * the connection to kafka is done before sending messaging to not touch the queue * use jsonutils to serialize samples Change-Id: I3fb731d2eb33cbfba38c5165ce9874af89072e34 Closes-bug: #1479976
Diffstat (limited to 'ceilometer/tests/unit/publisher/test_messaging_publisher.py')
-rw-r--r--ceilometer/tests/unit/publisher/test_messaging_publisher.py23
1 files changed, 11 insertions, 12 deletions
diff --git a/ceilometer/tests/unit/publisher/test_messaging_publisher.py b/ceilometer/tests/unit/publisher/test_messaging_publisher.py
index 23420307..3bd26487 100644
--- a/ceilometer/tests/unit/publisher/test_messaging_publisher.py
+++ b/ceilometer/tests/unit/publisher/test_messaging_publisher.py
@@ -21,7 +21,6 @@ import eventlet
import mock
from oslo_config import fixture as fixture_config
from oslo_context import context
-import oslo_messaging
from oslo_utils import netutils
import testscenarios.testcase
@@ -250,11 +249,11 @@ class TestPublisherPolicy(TestPublisher):
def test_published_with_no_policy(self, mylog):
publisher = self.publisher_cls(
netutils.urlsplit('%s://' % self.protocol))
- side_effect = oslo_messaging.MessageDeliveryFailure()
+ side_effect = msg_publisher.DeliveryFailure()
with mock.patch.object(publisher, '_send') as fake_send:
fake_send.side_effect = side_effect
self.assertRaises(
- oslo_messaging.MessageDeliveryFailure,
+ msg_publisher.DeliveryFailure,
getattr(publisher, self.pub_func),
mock.MagicMock(), self.test_data)
self.assertTrue(mylog.info.called)
@@ -267,11 +266,11 @@ class TestPublisherPolicy(TestPublisher):
def test_published_with_policy_block(self, mylog):
publisher = self.publisher_cls(
netutils.urlsplit('%s://?policy=default' % self.protocol))
- side_effect = oslo_messaging.MessageDeliveryFailure()
+ side_effect = msg_publisher.DeliveryFailure()
with mock.patch.object(publisher, '_send') as fake_send:
fake_send.side_effect = side_effect
self.assertRaises(
- oslo_messaging.MessageDeliveryFailure,
+ msg_publisher.DeliveryFailure,
getattr(publisher, self.pub_func),
mock.MagicMock(), self.test_data)
self.assertTrue(mylog.info.called)
@@ -283,11 +282,11 @@ class TestPublisherPolicy(TestPublisher):
def test_published_with_policy_incorrect(self, mylog):
publisher = self.publisher_cls(
netutils.urlsplit('%s://?policy=notexist' % self.protocol))
- side_effect = oslo_messaging.MessageDeliveryFailure()
+ side_effect = msg_publisher.DeliveryFailure()
with mock.patch.object(publisher, '_send') as fake_send:
fake_send.side_effect = side_effect
self.assertRaises(
- oslo_messaging.MessageDeliveryFailure,
+ msg_publisher.DeliveryFailure,
getattr(publisher, self.pub_func),
mock.MagicMock(), self.test_data)
self.assertTrue(mylog.warn.called)
@@ -303,7 +302,7 @@ class TestPublisherPolicyReactions(TestPublisher):
def test_published_with_policy_drop_and_rpc_down(self):
publisher = self.publisher_cls(
netutils.urlsplit('%s://?policy=drop' % self.protocol))
- side_effect = oslo_messaging.MessageDeliveryFailure()
+ side_effect = msg_publisher.DeliveryFailure()
with mock.patch.object(publisher, '_send') as fake_send:
fake_send.side_effect = side_effect
getattr(publisher, self.pub_func)(mock.MagicMock(),
@@ -315,7 +314,7 @@ class TestPublisherPolicyReactions(TestPublisher):
def test_published_with_policy_queue_and_rpc_down(self):
publisher = self.publisher_cls(
netutils.urlsplit('%s://?policy=queue' % self.protocol))
- side_effect = oslo_messaging.MessageDeliveryFailure()
+ side_effect = msg_publisher.DeliveryFailure()
with mock.patch.object(publisher, '_send') as fake_send:
fake_send.side_effect = side_effect
@@ -330,7 +329,7 @@ class TestPublisherPolicyReactions(TestPublisher):
publisher = self.publisher_cls(
netutils.urlsplit('%s://?policy=queue' % self.protocol))
- side_effect = oslo_messaging.MessageDeliveryFailure()
+ side_effect = msg_publisher.DeliveryFailure()
with mock.patch.object(publisher, '_send') as fake_send:
fake_send.side_effect = side_effect
getattr(publisher, self.pub_func)(mock.MagicMock(),
@@ -354,7 +353,7 @@ class TestPublisherPolicyReactions(TestPublisher):
publisher = self.publisher_cls(netutils.urlsplit(
'%s://?policy=queue&max_queue_length=3' % self.protocol))
- side_effect = oslo_messaging.MessageDeliveryFailure()
+ side_effect = msg_publisher.DeliveryFailure()
with mock.patch.object(publisher, '_send') as fake_send:
fake_send.side_effect = side_effect
for i in range(0, 5):
@@ -381,7 +380,7 @@ class TestPublisherPolicyReactions(TestPublisher):
publisher = self.publisher_cls(
netutils.urlsplit('%s://?policy=queue' % self.protocol))
- side_effect = oslo_messaging.MessageDeliveryFailure()
+ side_effect = msg_publisher.DeliveryFailure()
with mock.patch.object(publisher, '_send') as fake_send:
fake_send.side_effect = side_effect
for i in range(0, 2000):