diff options
author | Mehdi Abaakouk <sileht@redhat.com> | 2015-07-31 09:00:20 +0200 |
---|---|---|
committer | Mehdi Abaakouk <sileht@redhat.com> | 2015-08-06 11:53:11 +0200 |
commit | fb0601a90d4314af7b722fa076fc6b90664b7676 (patch) | |
tree | 49c6285e26ff0701aea7698d9c049cdce31f2cd3 /ceilometer/tests | |
parent | 49f53f35a5b7d616517b23e75b18e159ece41d4a (diff) | |
download | ceilometer-fb0601a90d4314af7b722fa076fc6b90664b7676.tar.gz |
Fixes the kafka publisher
The kafka publusher is not concurrency safe at all.
And the sample/event payload cannot be serialized correctly
To fix that:
* the code now is shared with the messaging one.
* the connection to kafka is done before sending messaging to not touch
the queue
* use jsonutils to serialize samples
Change-Id: I3fb731d2eb33cbfba38c5165ce9874af89072e34
Closes-bug: #1479976
Diffstat (limited to 'ceilometer/tests')
-rw-r--r-- | ceilometer/tests/unit/publisher/test_kafka_broker_publisher.py | 63 | ||||
-rw-r--r-- | ceilometer/tests/unit/publisher/test_messaging_publisher.py | 23 |
2 files changed, 43 insertions, 43 deletions
diff --git a/ceilometer/tests/unit/publisher/test_kafka_broker_publisher.py b/ceilometer/tests/unit/publisher/test_kafka_broker_publisher.py index d7ea0ba9..9daaaef1 100644 --- a/ceilometer/tests/unit/publisher/test_kafka_broker_publisher.py +++ b/ceilometer/tests/unit/publisher/test_kafka_broker_publisher.py @@ -22,12 +22,14 @@ from oslo_utils import netutils from ceilometer.event.storage import models as event from ceilometer.publisher import kafka_broker as kafka +from ceilometer.publisher import messaging as msg_publisher from ceilometer import sample from ceilometer.tests import base as tests_base @mock.patch('ceilometer.publisher.kafka_broker.LOG', mock.Mock()) -@mock.patch.object(kafka.KafkaBrokerPublisher, '_get_client', mock.Mock()) +@mock.patch('ceilometer.publisher.kafka_broker.kafka.KafkaClient', + mock.Mock()) class TestKafkaPublisher(tests_base.BaseTestCase): test_event_data = [ event.Event(message_id=uuid.uuid4(), @@ -95,25 +97,22 @@ class TestKafkaPublisher(tests_base.BaseTestCase): ), ] - def setUp(self): - super(TestKafkaPublisher, self).setUp() - def test_publish(self): publisher = kafka.KafkaBrokerPublisher(netutils.urlsplit( 'kafka://127.0.0.1:9092?topic=ceilometer')) - with mock.patch.object(publisher, '_send') as fake_send: + with mock.patch.object(publisher, '_producer') as fake_producer: publisher.publish_samples(mock.MagicMock(), self.test_data) - self.assertEqual(1, len(fake_send.mock_calls)) + self.assertEqual(5, len(fake_producer.send_messages.mock_calls)) self.assertEqual(0, len(publisher.local_queue)) def test_publish_without_options(self): publisher = kafka.KafkaBrokerPublisher( netutils.urlsplit('kafka://127.0.0.1:9092')) - with mock.patch.object(publisher, '_send') as fake_send: + with mock.patch.object(publisher, '_producer') as fake_producer: publisher.publish_samples(mock.MagicMock(), self.test_data) - self.assertEqual(1, len(fake_send.mock_calls)) + self.assertEqual(5, len(fake_producer.send_messages.mock_calls)) self.assertEqual(0, len(publisher.local_queue)) def test_publish_to_host_without_policy(self): @@ -129,39 +128,40 @@ class TestKafkaPublisher(tests_base.BaseTestCase): publisher = kafka.KafkaBrokerPublisher(netutils.urlsplit( 'kafka://127.0.0.1:9092?topic=ceilometer&policy=default')) - with mock.patch.object(publisher, '_send') as fake_send: - fake_send.side_effect = TypeError - self.assertRaises(TypeError, publisher.publish_samples, + with mock.patch.object(publisher, '_producer') as fake_producer: + fake_producer.send_messages.side_effect = TypeError + self.assertRaises(msg_publisher.DeliveryFailure, + publisher.publish_samples, mock.MagicMock(), self.test_data) - self.assertEqual(100, len(fake_send.mock_calls)) + self.assertEqual(100, len(fake_producer.send_messages.mock_calls)) self.assertEqual(0, len(publisher.local_queue)) def test_publish_to_host_with_drop_policy(self): publisher = kafka.KafkaBrokerPublisher(netutils.urlsplit( 'kafka://127.0.0.1:9092?topic=ceilometer&policy=drop')) - with mock.patch.object(publisher, '_send') as fake_send: - fake_send.side_effect = Exception("test") + with mock.patch.object(publisher, '_producer') as fake_producer: + fake_producer.send_messages.side_effect = Exception("test") publisher.publish_samples(mock.MagicMock(), self.test_data) - self.assertEqual(1, len(fake_send.mock_calls)) + self.assertEqual(1, len(fake_producer.send_messages.mock_calls)) self.assertEqual(0, len(publisher.local_queue)) def test_publish_to_host_with_queue_policy(self): publisher = kafka.KafkaBrokerPublisher(netutils.urlsplit( 'kafka://127.0.0.1:9092?topic=ceilometer&policy=queue')) - with mock.patch.object(publisher, '_send') as fake_send: - fake_send.side_effect = Exception("test") + with mock.patch.object(publisher, '_producer') as fake_producer: + fake_producer.send_messages.side_effect = Exception("test") publisher.publish_samples(mock.MagicMock(), self.test_data) - self.assertEqual(1, len(fake_send.mock_calls)) + self.assertEqual(1, len(fake_producer.send_messages.mock_calls)) self.assertEqual(1, len(publisher.local_queue)) def test_publish_to_down_host_with_default_queue_size(self): publisher = kafka.KafkaBrokerPublisher(netutils.urlsplit( 'kafka://127.0.0.1:9092?topic=ceilometer&policy=queue')) - with mock.patch.object(publisher, '_send') as fake_send: - fake_send.side_effect = Exception('No Connection') + with mock.patch.object(publisher, '_producer') as fake_producer: + fake_producer.send_messages.side_effect = Exception("test") for i in range(0, 2000): for s in self.test_data: @@ -170,16 +170,16 @@ class TestKafkaPublisher(tests_base.BaseTestCase): self.assertEqual(1024, len(publisher.local_queue)) self.assertEqual('test-976', - publisher.local_queue[0][0]['counter_name']) + publisher.local_queue[0][2][0]['counter_name']) self.assertEqual('test-1999', - publisher.local_queue[1023][0]['counter_name']) + publisher.local_queue[1023][2][0]['counter_name']) def test_publish_to_host_from_down_to_up_with_queue(self): publisher = kafka.KafkaBrokerPublisher(netutils.urlsplit( 'kafka://127.0.0.1:9092?topic=ceilometer&policy=queue')) - with mock.patch.object(publisher, '_send') as fake_send: - fake_send.side_effect = Exception('No Connection') + with mock.patch.object(publisher, '_producer') as fake_producer: + fake_producer.send_messages.side_effect = Exception("test") for i in range(0, 16): for s in self.test_data: s.name = 'test-%d' % i @@ -187,7 +187,7 @@ class TestKafkaPublisher(tests_base.BaseTestCase): self.assertEqual(16, len(publisher.local_queue)) - fake_send.side_effect = None + fake_producer.send_messages.side_effect = None for s in self.test_data: s.name = 'test-%d' % 16 publisher.publish_samples(mock.MagicMock(), self.test_data) @@ -197,13 +197,14 @@ class TestKafkaPublisher(tests_base.BaseTestCase): publisher = kafka.KafkaBrokerPublisher( netutils.urlsplit('kafka://127.0.0.1:9092?topic=ceilometer')) - with mock.patch.object(publisher, '_send') as fake_send: + with mock.patch.object(publisher, '_producer') as fake_producer: publisher.publish_events(mock.MagicMock(), self.test_event_data) - self.assertEqual(1, len(fake_send.mock_calls)) + self.assertEqual(5, len(fake_producer.send_messages.mock_calls)) - with mock.patch.object(publisher, '_send') as fake_send: - fake_send.side_effect = TypeError - self.assertRaises(TypeError, publisher.publish_events, + with mock.patch.object(publisher, '_producer') as fake_producer: + fake_producer.send_messages.side_effect = Exception("test") + self.assertRaises(msg_publisher.DeliveryFailure, + publisher.publish_events, mock.MagicMock(), self.test_event_data) - self.assertEqual(100, len(fake_send.mock_calls)) + self.assertEqual(100, len(fake_producer.send_messages.mock_calls)) self.assertEqual(0, len(publisher.local_queue)) diff --git a/ceilometer/tests/unit/publisher/test_messaging_publisher.py b/ceilometer/tests/unit/publisher/test_messaging_publisher.py index 23420307..3bd26487 100644 --- a/ceilometer/tests/unit/publisher/test_messaging_publisher.py +++ b/ceilometer/tests/unit/publisher/test_messaging_publisher.py @@ -21,7 +21,6 @@ import eventlet import mock from oslo_config import fixture as fixture_config from oslo_context import context -import oslo_messaging from oslo_utils import netutils import testscenarios.testcase @@ -250,11 +249,11 @@ class TestPublisherPolicy(TestPublisher): def test_published_with_no_policy(self, mylog): publisher = self.publisher_cls( netutils.urlsplit('%s://' % self.protocol)) - side_effect = oslo_messaging.MessageDeliveryFailure() + side_effect = msg_publisher.DeliveryFailure() with mock.patch.object(publisher, '_send') as fake_send: fake_send.side_effect = side_effect self.assertRaises( - oslo_messaging.MessageDeliveryFailure, + msg_publisher.DeliveryFailure, getattr(publisher, self.pub_func), mock.MagicMock(), self.test_data) self.assertTrue(mylog.info.called) @@ -267,11 +266,11 @@ class TestPublisherPolicy(TestPublisher): def test_published_with_policy_block(self, mylog): publisher = self.publisher_cls( netutils.urlsplit('%s://?policy=default' % self.protocol)) - side_effect = oslo_messaging.MessageDeliveryFailure() + side_effect = msg_publisher.DeliveryFailure() with mock.patch.object(publisher, '_send') as fake_send: fake_send.side_effect = side_effect self.assertRaises( - oslo_messaging.MessageDeliveryFailure, + msg_publisher.DeliveryFailure, getattr(publisher, self.pub_func), mock.MagicMock(), self.test_data) self.assertTrue(mylog.info.called) @@ -283,11 +282,11 @@ class TestPublisherPolicy(TestPublisher): def test_published_with_policy_incorrect(self, mylog): publisher = self.publisher_cls( netutils.urlsplit('%s://?policy=notexist' % self.protocol)) - side_effect = oslo_messaging.MessageDeliveryFailure() + side_effect = msg_publisher.DeliveryFailure() with mock.patch.object(publisher, '_send') as fake_send: fake_send.side_effect = side_effect self.assertRaises( - oslo_messaging.MessageDeliveryFailure, + msg_publisher.DeliveryFailure, getattr(publisher, self.pub_func), mock.MagicMock(), self.test_data) self.assertTrue(mylog.warn.called) @@ -303,7 +302,7 @@ class TestPublisherPolicyReactions(TestPublisher): def test_published_with_policy_drop_and_rpc_down(self): publisher = self.publisher_cls( netutils.urlsplit('%s://?policy=drop' % self.protocol)) - side_effect = oslo_messaging.MessageDeliveryFailure() + side_effect = msg_publisher.DeliveryFailure() with mock.patch.object(publisher, '_send') as fake_send: fake_send.side_effect = side_effect getattr(publisher, self.pub_func)(mock.MagicMock(), @@ -315,7 +314,7 @@ class TestPublisherPolicyReactions(TestPublisher): def test_published_with_policy_queue_and_rpc_down(self): publisher = self.publisher_cls( netutils.urlsplit('%s://?policy=queue' % self.protocol)) - side_effect = oslo_messaging.MessageDeliveryFailure() + side_effect = msg_publisher.DeliveryFailure() with mock.patch.object(publisher, '_send') as fake_send: fake_send.side_effect = side_effect @@ -330,7 +329,7 @@ class TestPublisherPolicyReactions(TestPublisher): publisher = self.publisher_cls( netutils.urlsplit('%s://?policy=queue' % self.protocol)) - side_effect = oslo_messaging.MessageDeliveryFailure() + side_effect = msg_publisher.DeliveryFailure() with mock.patch.object(publisher, '_send') as fake_send: fake_send.side_effect = side_effect getattr(publisher, self.pub_func)(mock.MagicMock(), @@ -354,7 +353,7 @@ class TestPublisherPolicyReactions(TestPublisher): publisher = self.publisher_cls(netutils.urlsplit( '%s://?policy=queue&max_queue_length=3' % self.protocol)) - side_effect = oslo_messaging.MessageDeliveryFailure() + side_effect = msg_publisher.DeliveryFailure() with mock.patch.object(publisher, '_send') as fake_send: fake_send.side_effect = side_effect for i in range(0, 5): @@ -381,7 +380,7 @@ class TestPublisherPolicyReactions(TestPublisher): publisher = self.publisher_cls( netutils.urlsplit('%s://?policy=queue' % self.protocol)) - side_effect = oslo_messaging.MessageDeliveryFailure() + side_effect = msg_publisher.DeliveryFailure() with mock.patch.object(publisher, '_send') as fake_send: fake_send.side_effect = side_effect for i in range(0, 2000): |