summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2017-07-27 20:48:05 +0000
committerGerrit Code Review <review@openstack.org>2017-07-27 20:48:05 +0000
commit22e8481e70bd5268fcddbb1f595ff8078f88f404 (patch)
tree51c798411b77e88ecbd48e4c2f7453dfda757fe0
parentc83d15fd25eb2992a1d88705bedc9483764ac758 (diff)
parentaf23b6eeafcb7adc76f60bfcb04aee699c975e31 (diff)
downloadceilometer-22e8481e70bd5268fcddbb1f595ff8078f88f404.tar.gz
Merge "Deprecate kafka publisher"
-rw-r--r--ceilometer/publisher/kafka_broker.py4
-rw-r--r--ceilometer/publisher/messaging.py42
-rw-r--r--ceilometer/tests/unit/publisher/test_messaging_publisher.py47
-rw-r--r--releasenotes/notes/deprecate-kafka-publisher-17b4f221758e15da.yaml11
4 files changed, 99 insertions, 5 deletions
diff --git a/ceilometer/publisher/kafka_broker.py b/ceilometer/publisher/kafka_broker.py
index 8b8955f1..d3b35942 100644
--- a/ceilometer/publisher/kafka_broker.py
+++ b/ceilometer/publisher/kafka_broker.py
@@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
+from debtcollector import removals
import kafka
from oslo_log import log
from oslo_serialization import jsonutils
@@ -24,6 +25,9 @@ from ceilometer.publisher import messaging
LOG = log.getLogger(__name__)
+@removals.removed_class("KafkaBrokerPublisher",
+ message="use NotifierPublisher instead",
+ removal_version='10.0')
class KafkaBrokerPublisher(messaging.MessagingPublisher):
"""Publish metering data to kafka broker.
diff --git a/ceilometer/publisher/messaging.py b/ceilometer/publisher/messaging.py
index 7343030b..f62cfc5c 100644
--- a/ceilometer/publisher/messaging.py
+++ b/ceilometer/publisher/messaging.py
@@ -186,11 +186,49 @@ class MessagingPublisher(publisher.ConfigPublisherBase):
class NotifierPublisher(MessagingPublisher):
+ """Publish metering data from notifer publisher.
+
+ The ip address and port number of notifer can be configured in
+ ceilometer pipeline configuration file.
+
+ User can customize the transport driver such as rabbit, kafka and
+ so on. The Notifer uses `sample` method as default method to send
+ notifications.
+
+ This publisher has transmit options such as queue, drop, and
+ retry. These options are specified using policy field of URL parameter.
+ When queue option could be selected, local queue length can be determined
+ using max_queue_length field as well. When the transfer fails with retry
+ option, try to resend the data as many times as specified in max_retry
+ field. If max_retry is not specified, by default the number of retry
+ is 100.
+
+ To enable this publisher, add the following section to the
+ /etc/ceilometer/pipeline.yaml file or simply add it to an existing
+ pipeline::
+
+ meter:
+ - name: meter_notifier
+ meters:
+ - "*"
+ sinks:
+ - notifier_sink
+ sinks:
+ - name: notifier_sink
+ transformers:
+ publishers:
+ - notifer://[notifier_ip]:[notifier_port]?topic=[topic]&
+ driver=driver&max_retry=100
+
+ """
+
def __init__(self, conf, parsed_url, default_topic):
super(NotifierPublisher, self).__init__(conf, parsed_url)
options = urlparse.parse_qs(parsed_url.query)
- topic = options.pop('topic', [default_topic])
+ topics = options.pop('topic', [default_topic])
driver = options.pop('driver', ['rabbit'])[0]
+ self.max_retry = int(options.get('max_retry', [100])[-1])
+
url = None
if parsed_url.netloc != '':
url = urlparse.urlunsplit([driver, parsed_url.netloc,
@@ -201,7 +239,7 @@ class NotifierPublisher(MessagingPublisher):
messaging.get_transport(self.conf, url),
driver=self.conf.publisher_notifier.telemetry_driver,
publisher_id='telemetry.publisher.%s' % self.conf.host,
- topics=topic,
+ topics=topics,
retry=self.retry
)
diff --git a/ceilometer/tests/unit/publisher/test_messaging_publisher.py b/ceilometer/tests/unit/publisher/test_messaging_publisher.py
index 203a48bc..6be8ada3 100644
--- a/ceilometer/tests/unit/publisher/test_messaging_publisher.py
+++ b/ceilometer/tests/unit/publisher/test_messaging_publisher.py
@@ -18,6 +18,8 @@ import datetime
import uuid
import mock
+import oslo_messaging
+from oslo_messaging._drivers import impl_kafka as kafka_driver
from oslo_utils import netutils
import testscenarios.testcase
@@ -147,6 +149,42 @@ class NotifierOnlyPublisherTest(BasePublisherTestCase):
cgt.assert_called_with(self.CONF, 'amqp://foo:foo@127.0.0.1:1234/foo'
'?amqp_auto_delete=true')
+ @mock.patch('ceilometer.messaging.get_transport')
+ def test_publish_with_none_rabbit_driver(self, cgt):
+ sample_publisher = msg_publisher.SampleNotifierPublisher(
+ self.CONF,
+ netutils.urlsplit('notifier://127.0.0.1:9092?driver=kafka'))
+ cgt.assert_called_with(self.CONF, 'kafka://127.0.0.1:9092')
+ transport = oslo_messaging.get_transport(self.CONF,
+ 'kafka://127.0.0.1:9092')
+ self.assertIsInstance(transport._driver, kafka_driver.KafkaDriver)
+
+ side_effect = msg_publisher.DeliveryFailure()
+ with mock.patch.object(sample_publisher, '_send') as fake_send:
+ fake_send.side_effect = side_effect
+ self.assertRaises(
+ msg_publisher.DeliveryFailure,
+ sample_publisher.publish_samples,
+ self.test_sample_data)
+ self.assertEqual(0, len(sample_publisher.local_queue))
+ self.assertEqual(100, len(fake_send.mock_calls))
+ fake_send.assert_called_with('metering', mock.ANY)
+
+ event_publisher = msg_publisher.EventNotifierPublisher(
+ self.CONF,
+ netutils.urlsplit('notifier://127.0.0.1:9092?driver=kafka'))
+ cgt.assert_called_with(self.CONF, 'kafka://127.0.0.1:9092')
+
+ with mock.patch.object(event_publisher, '_send') as fake_send:
+ fake_send.side_effect = side_effect
+ self.assertRaises(
+ msg_publisher.DeliveryFailure,
+ event_publisher.publish_events,
+ self.test_event_data)
+ self.assertEqual(0, len(event_publisher.local_queue))
+ self.assertEqual(100, len(fake_send.mock_calls))
+ fake_send.assert_called_with('event', mock.ANY)
+
class TestPublisher(testscenarios.testcase.WithScenarios,
BasePublisherTestCase):
@@ -186,7 +224,8 @@ class TestPublisherPolicy(TestPublisher):
self.assertTrue(mylog.info.called)
self.assertEqual('default', publisher.policy)
self.assertEqual(0, len(publisher.local_queue))
- fake_send.assert_called_once_with(
+ self.assertEqual(100, len(fake_send.mock_calls))
+ fake_send.assert_called_with(
self.topic, mock.ANY)
@mock.patch('ceilometer.publisher.messaging.LOG')
@@ -203,7 +242,8 @@ class TestPublisherPolicy(TestPublisher):
self.test_data)
self.assertTrue(mylog.info.called)
self.assertEqual(0, len(publisher.local_queue))
- fake_send.assert_called_once_with(
+ self.assertEqual(100, len(fake_send.mock_calls))
+ fake_send.assert_called_with(
self.topic, mock.ANY)
@mock.patch('ceilometer.publisher.messaging.LOG')
@@ -221,7 +261,8 @@ class TestPublisherPolicy(TestPublisher):
self.assertTrue(mylog.warning.called)
self.assertEqual('default', publisher.policy)
self.assertEqual(0, len(publisher.local_queue))
- fake_send.assert_called_once_with(
+ self.assertEqual(100, len(fake_send.mock_calls))
+ fake_send.assert_called_with(
self.topic, mock.ANY)
diff --git a/releasenotes/notes/deprecate-kafka-publisher-17b4f221758e15da.yaml b/releasenotes/notes/deprecate-kafka-publisher-17b4f221758e15da.yaml
new file mode 100644
index 00000000..0f58826d
--- /dev/null
+++ b/releasenotes/notes/deprecate-kafka-publisher-17b4f221758e15da.yaml
@@ -0,0 +1,11 @@
+---
+features:
+ - |
+ Ceilometer supports generic notifier to publish data and allow user to
+ customize parameters such as topic, transport driver and priority. The
+ publisher configuration in pipeline.yaml can be
+ notifer://[notifier_ip]:[notifier_port]?topic=[topic]&driver=driver&max_retry=100
+ Not only rabbit driver, but also other driver like kafka can be used.
+deprecations:
+ - |
+ Kafka publisher is deprecated to use generic notifier instead.