summaryrefslogtreecommitdiff
path: root/ceilometer/tests
diff options
context:
space:
mode:
authorMehdi Abaakouk <sileht@redhat.com>2015-07-31 09:00:20 +0200
committerMehdi Abaakouk <sileht@redhat.com>2015-08-06 11:53:11 +0200
commitfb0601a90d4314af7b722fa076fc6b90664b7676 (patch)
tree49c6285e26ff0701aea7698d9c049cdce31f2cd3 /ceilometer/tests
parent49f53f35a5b7d616517b23e75b18e159ece41d4a (diff)
downloadceilometer-fb0601a90d4314af7b722fa076fc6b90664b7676.tar.gz
Fixes the kafka publisher
The kafka publusher is not concurrency safe at all. And the sample/event payload cannot be serialized correctly To fix that: * the code now is shared with the messaging one. * the connection to kafka is done before sending messaging to not touch the queue * use jsonutils to serialize samples Change-Id: I3fb731d2eb33cbfba38c5165ce9874af89072e34 Closes-bug: #1479976
Diffstat (limited to 'ceilometer/tests')
-rw-r--r--ceilometer/tests/unit/publisher/test_kafka_broker_publisher.py63
-rw-r--r--ceilometer/tests/unit/publisher/test_messaging_publisher.py23
2 files changed, 43 insertions, 43 deletions
diff --git a/ceilometer/tests/unit/publisher/test_kafka_broker_publisher.py b/ceilometer/tests/unit/publisher/test_kafka_broker_publisher.py
index d7ea0ba9..9daaaef1 100644
--- a/ceilometer/tests/unit/publisher/test_kafka_broker_publisher.py
+++ b/ceilometer/tests/unit/publisher/test_kafka_broker_publisher.py
@@ -22,12 +22,14 @@ from oslo_utils import netutils
from ceilometer.event.storage import models as event
from ceilometer.publisher import kafka_broker as kafka
+from ceilometer.publisher import messaging as msg_publisher
from ceilometer import sample
from ceilometer.tests import base as tests_base
@mock.patch('ceilometer.publisher.kafka_broker.LOG', mock.Mock())
-@mock.patch.object(kafka.KafkaBrokerPublisher, '_get_client', mock.Mock())
+@mock.patch('ceilometer.publisher.kafka_broker.kafka.KafkaClient',
+ mock.Mock())
class TestKafkaPublisher(tests_base.BaseTestCase):
test_event_data = [
event.Event(message_id=uuid.uuid4(),
@@ -95,25 +97,22 @@ class TestKafkaPublisher(tests_base.BaseTestCase):
),
]
- def setUp(self):
- super(TestKafkaPublisher, self).setUp()
-
def test_publish(self):
publisher = kafka.KafkaBrokerPublisher(netutils.urlsplit(
'kafka://127.0.0.1:9092?topic=ceilometer'))
- with mock.patch.object(publisher, '_send') as fake_send:
+ with mock.patch.object(publisher, '_producer') as fake_producer:
publisher.publish_samples(mock.MagicMock(), self.test_data)
- self.assertEqual(1, len(fake_send.mock_calls))
+ self.assertEqual(5, len(fake_producer.send_messages.mock_calls))
self.assertEqual(0, len(publisher.local_queue))
def test_publish_without_options(self):
publisher = kafka.KafkaBrokerPublisher(
netutils.urlsplit('kafka://127.0.0.1:9092'))
- with mock.patch.object(publisher, '_send') as fake_send:
+ with mock.patch.object(publisher, '_producer') as fake_producer:
publisher.publish_samples(mock.MagicMock(), self.test_data)
- self.assertEqual(1, len(fake_send.mock_calls))
+ self.assertEqual(5, len(fake_producer.send_messages.mock_calls))
self.assertEqual(0, len(publisher.local_queue))
def test_publish_to_host_without_policy(self):
@@ -129,39 +128,40 @@ class TestKafkaPublisher(tests_base.BaseTestCase):
publisher = kafka.KafkaBrokerPublisher(netutils.urlsplit(
'kafka://127.0.0.1:9092?topic=ceilometer&policy=default'))
- with mock.patch.object(publisher, '_send') as fake_send:
- fake_send.side_effect = TypeError
- self.assertRaises(TypeError, publisher.publish_samples,
+ with mock.patch.object(publisher, '_producer') as fake_producer:
+ fake_producer.send_messages.side_effect = TypeError
+ self.assertRaises(msg_publisher.DeliveryFailure,
+ publisher.publish_samples,
mock.MagicMock(), self.test_data)
- self.assertEqual(100, len(fake_send.mock_calls))
+ self.assertEqual(100, len(fake_producer.send_messages.mock_calls))
self.assertEqual(0, len(publisher.local_queue))
def test_publish_to_host_with_drop_policy(self):
publisher = kafka.KafkaBrokerPublisher(netutils.urlsplit(
'kafka://127.0.0.1:9092?topic=ceilometer&policy=drop'))
- with mock.patch.object(publisher, '_send') as fake_send:
- fake_send.side_effect = Exception("test")
+ with mock.patch.object(publisher, '_producer') as fake_producer:
+ fake_producer.send_messages.side_effect = Exception("test")
publisher.publish_samples(mock.MagicMock(), self.test_data)
- self.assertEqual(1, len(fake_send.mock_calls))
+ self.assertEqual(1, len(fake_producer.send_messages.mock_calls))
self.assertEqual(0, len(publisher.local_queue))
def test_publish_to_host_with_queue_policy(self):
publisher = kafka.KafkaBrokerPublisher(netutils.urlsplit(
'kafka://127.0.0.1:9092?topic=ceilometer&policy=queue'))
- with mock.patch.object(publisher, '_send') as fake_send:
- fake_send.side_effect = Exception("test")
+ with mock.patch.object(publisher, '_producer') as fake_producer:
+ fake_producer.send_messages.side_effect = Exception("test")
publisher.publish_samples(mock.MagicMock(), self.test_data)
- self.assertEqual(1, len(fake_send.mock_calls))
+ self.assertEqual(1, len(fake_producer.send_messages.mock_calls))
self.assertEqual(1, len(publisher.local_queue))
def test_publish_to_down_host_with_default_queue_size(self):
publisher = kafka.KafkaBrokerPublisher(netutils.urlsplit(
'kafka://127.0.0.1:9092?topic=ceilometer&policy=queue'))
- with mock.patch.object(publisher, '_send') as fake_send:
- fake_send.side_effect = Exception('No Connection')
+ with mock.patch.object(publisher, '_producer') as fake_producer:
+ fake_producer.send_messages.side_effect = Exception("test")
for i in range(0, 2000):
for s in self.test_data:
@@ -170,16 +170,16 @@ class TestKafkaPublisher(tests_base.BaseTestCase):
self.assertEqual(1024, len(publisher.local_queue))
self.assertEqual('test-976',
- publisher.local_queue[0][0]['counter_name'])
+ publisher.local_queue[0][2][0]['counter_name'])
self.assertEqual('test-1999',
- publisher.local_queue[1023][0]['counter_name'])
+ publisher.local_queue[1023][2][0]['counter_name'])
def test_publish_to_host_from_down_to_up_with_queue(self):
publisher = kafka.KafkaBrokerPublisher(netutils.urlsplit(
'kafka://127.0.0.1:9092?topic=ceilometer&policy=queue'))
- with mock.patch.object(publisher, '_send') as fake_send:
- fake_send.side_effect = Exception('No Connection')
+ with mock.patch.object(publisher, '_producer') as fake_producer:
+ fake_producer.send_messages.side_effect = Exception("test")
for i in range(0, 16):
for s in self.test_data:
s.name = 'test-%d' % i
@@ -187,7 +187,7 @@ class TestKafkaPublisher(tests_base.BaseTestCase):
self.assertEqual(16, len(publisher.local_queue))
- fake_send.side_effect = None
+ fake_producer.send_messages.side_effect = None
for s in self.test_data:
s.name = 'test-%d' % 16
publisher.publish_samples(mock.MagicMock(), self.test_data)
@@ -197,13 +197,14 @@ class TestKafkaPublisher(tests_base.BaseTestCase):
publisher = kafka.KafkaBrokerPublisher(
netutils.urlsplit('kafka://127.0.0.1:9092?topic=ceilometer'))
- with mock.patch.object(publisher, '_send') as fake_send:
+ with mock.patch.object(publisher, '_producer') as fake_producer:
publisher.publish_events(mock.MagicMock(), self.test_event_data)
- self.assertEqual(1, len(fake_send.mock_calls))
+ self.assertEqual(5, len(fake_producer.send_messages.mock_calls))
- with mock.patch.object(publisher, '_send') as fake_send:
- fake_send.side_effect = TypeError
- self.assertRaises(TypeError, publisher.publish_events,
+ with mock.patch.object(publisher, '_producer') as fake_producer:
+ fake_producer.send_messages.side_effect = Exception("test")
+ self.assertRaises(msg_publisher.DeliveryFailure,
+ publisher.publish_events,
mock.MagicMock(), self.test_event_data)
- self.assertEqual(100, len(fake_send.mock_calls))
+ self.assertEqual(100, len(fake_producer.send_messages.mock_calls))
self.assertEqual(0, len(publisher.local_queue))
diff --git a/ceilometer/tests/unit/publisher/test_messaging_publisher.py b/ceilometer/tests/unit/publisher/test_messaging_publisher.py
index 23420307..3bd26487 100644
--- a/ceilometer/tests/unit/publisher/test_messaging_publisher.py
+++ b/ceilometer/tests/unit/publisher/test_messaging_publisher.py
@@ -21,7 +21,6 @@ import eventlet
import mock
from oslo_config import fixture as fixture_config
from oslo_context import context
-import oslo_messaging
from oslo_utils import netutils
import testscenarios.testcase
@@ -250,11 +249,11 @@ class TestPublisherPolicy(TestPublisher):
def test_published_with_no_policy(self, mylog):
publisher = self.publisher_cls(
netutils.urlsplit('%s://' % self.protocol))
- side_effect = oslo_messaging.MessageDeliveryFailure()
+ side_effect = msg_publisher.DeliveryFailure()
with mock.patch.object(publisher, '_send') as fake_send:
fake_send.side_effect = side_effect
self.assertRaises(
- oslo_messaging.MessageDeliveryFailure,
+ msg_publisher.DeliveryFailure,
getattr(publisher, self.pub_func),
mock.MagicMock(), self.test_data)
self.assertTrue(mylog.info.called)
@@ -267,11 +266,11 @@ class TestPublisherPolicy(TestPublisher):
def test_published_with_policy_block(self, mylog):
publisher = self.publisher_cls(
netutils.urlsplit('%s://?policy=default' % self.protocol))
- side_effect = oslo_messaging.MessageDeliveryFailure()
+ side_effect = msg_publisher.DeliveryFailure()
with mock.patch.object(publisher, '_send') as fake_send:
fake_send.side_effect = side_effect
self.assertRaises(
- oslo_messaging.MessageDeliveryFailure,
+ msg_publisher.DeliveryFailure,
getattr(publisher, self.pub_func),
mock.MagicMock(), self.test_data)
self.assertTrue(mylog.info.called)
@@ -283,11 +282,11 @@ class TestPublisherPolicy(TestPublisher):
def test_published_with_policy_incorrect(self, mylog):
publisher = self.publisher_cls(
netutils.urlsplit('%s://?policy=notexist' % self.protocol))
- side_effect = oslo_messaging.MessageDeliveryFailure()
+ side_effect = msg_publisher.DeliveryFailure()
with mock.patch.object(publisher, '_send') as fake_send:
fake_send.side_effect = side_effect
self.assertRaises(
- oslo_messaging.MessageDeliveryFailure,
+ msg_publisher.DeliveryFailure,
getattr(publisher, self.pub_func),
mock.MagicMock(), self.test_data)
self.assertTrue(mylog.warn.called)
@@ -303,7 +302,7 @@ class TestPublisherPolicyReactions(TestPublisher):
def test_published_with_policy_drop_and_rpc_down(self):
publisher = self.publisher_cls(
netutils.urlsplit('%s://?policy=drop' % self.protocol))
- side_effect = oslo_messaging.MessageDeliveryFailure()
+ side_effect = msg_publisher.DeliveryFailure()
with mock.patch.object(publisher, '_send') as fake_send:
fake_send.side_effect = side_effect
getattr(publisher, self.pub_func)(mock.MagicMock(),
@@ -315,7 +314,7 @@ class TestPublisherPolicyReactions(TestPublisher):
def test_published_with_policy_queue_and_rpc_down(self):
publisher = self.publisher_cls(
netutils.urlsplit('%s://?policy=queue' % self.protocol))
- side_effect = oslo_messaging.MessageDeliveryFailure()
+ side_effect = msg_publisher.DeliveryFailure()
with mock.patch.object(publisher, '_send') as fake_send:
fake_send.side_effect = side_effect
@@ -330,7 +329,7 @@ class TestPublisherPolicyReactions(TestPublisher):
publisher = self.publisher_cls(
netutils.urlsplit('%s://?policy=queue' % self.protocol))
- side_effect = oslo_messaging.MessageDeliveryFailure()
+ side_effect = msg_publisher.DeliveryFailure()
with mock.patch.object(publisher, '_send') as fake_send:
fake_send.side_effect = side_effect
getattr(publisher, self.pub_func)(mock.MagicMock(),
@@ -354,7 +353,7 @@ class TestPublisherPolicyReactions(TestPublisher):
publisher = self.publisher_cls(netutils.urlsplit(
'%s://?policy=queue&max_queue_length=3' % self.protocol))
- side_effect = oslo_messaging.MessageDeliveryFailure()
+ side_effect = msg_publisher.DeliveryFailure()
with mock.patch.object(publisher, '_send') as fake_send:
fake_send.side_effect = side_effect
for i in range(0, 5):
@@ -381,7 +380,7 @@ class TestPublisherPolicyReactions(TestPublisher):
publisher = self.publisher_cls(
netutils.urlsplit('%s://?policy=queue' % self.protocol))
- side_effect = oslo_messaging.MessageDeliveryFailure()
+ side_effect = msg_publisher.DeliveryFailure()
with mock.patch.object(publisher, '_send') as fake_send:
fake_send.side_effect = side_effect
for i in range(0, 2000):