summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMehdi Abaakouk <mehdi.abaakouk@enovance.com>2013-11-18 15:26:41 +0100
committerMehdi Abaakouk <mehdi.abaakouk@enovance.com>2014-04-14 11:56:56 +0200
commit5995bf906cea07e699d3e2c0ebb5532e70bc84c5 (patch)
tree5df969214dac9cebd3a49429a70c238aa62e71ec
parent0eb9f5131be64392664fd0317f4cf388787b662b (diff)
downloadceilometer-5995bf906cea07e699d3e2c0ebb5532e70bc84c5.tar.gz
Replace oslo.rpc by oslo.messaging
The patch replaces oslo.rpc by oslo.messaging. The important changes: - On the collector, the queue name and the topic are now the same (ie: metering, instead of ceilometer.collector.metering for the queue and metering for the topic) - Same for the alarm partitionner the queue is no more prefixed by ceilometer.alarm. Implements switch-to-oslo.messaging Change-Id: Ia5e4ff1dd1d419c090b8039627234ae7f07e8660
-rwxr-xr-xbin/ceilometer-rpc-zmq-receiver53
-rw-r--r--ceilometer/alarm/rpc.py58
-rw-r--r--ceilometer/alarm/service.py46
-rw-r--r--ceilometer/api/controllers/v2.py6
-rw-r--r--ceilometer/cli.py14
-rw-r--r--ceilometer/collector.py36
-rw-r--r--ceilometer/compute/notifications/__init__.py14
-rw-r--r--ceilometer/image/notifications.py14
-rw-r--r--ceilometer/messaging.py91
-rw-r--r--ceilometer/middleware.py15
-rw-r--r--ceilometer/network/notifications.py17
-rw-r--r--ceilometer/notification.py104
-rw-r--r--ceilometer/openstack/common/notifier/__init__.py0
-rw-r--r--ceilometer/openstack/common/notifier/api.py173
-rw-r--r--ceilometer/openstack/common/notifier/log_notifier.py37
-rw-r--r--ceilometer/openstack/common/notifier/no_op_notifier.py19
-rw-r--r--ceilometer/openstack/common/notifier/proxy.py77
-rw-r--r--ceilometer/openstack/common/notifier/rpc_notifier.py47
-rw-r--r--ceilometer/openstack/common/notifier/rpc_notifier2.py53
-rw-r--r--ceilometer/openstack/common/notifier/test_notifier.py21
-rw-r--r--ceilometer/openstack/common/rpc/__init__.py275
-rw-r--r--ceilometer/openstack/common/rpc/amqp.py637
-rw-r--r--ceilometer/openstack/common/rpc/common.py508
-rw-r--r--ceilometer/openstack/common/rpc/dispatcher.py178
-rw-r--r--ceilometer/openstack/common/rpc/impl_fake.py195
-rw-r--r--ceilometer/openstack/common/rpc/impl_kombu.py858
-rw-r--r--ceilometer/openstack/common/rpc/impl_qpid.py823
-rw-r--r--ceilometer/openstack/common/rpc/impl_zmq.py818
-rw-r--r--ceilometer/openstack/common/rpc/matchmaker.py323
-rw-r--r--ceilometer/openstack/common/rpc/matchmaker_redis.py144
-rw-r--r--ceilometer/openstack/common/rpc/matchmaker_ring.py106
-rw-r--r--ceilometer/openstack/common/rpc/proxy.py225
-rw-r--r--ceilometer/openstack/common/rpc/serializer.py54
-rw-r--r--ceilometer/openstack/common/rpc/service.py75
-rw-r--r--ceilometer/openstack/common/rpc/zmq_receiver.py38
-rw-r--r--ceilometer/orchestration/notifications.py15
-rw-r--r--ceilometer/plugin.py25
-rw-r--r--ceilometer/publisher/rpc.py70
-rw-r--r--ceilometer/service.py4
-rw-r--r--ceilometer/tests/alarm/partition/test_coordination.py4
-rw-r--r--ceilometer/tests/alarm/test_notifier.py8
-rw-r--r--ceilometer/tests/alarm/test_partitioned_alarm_svc.py12
-rw-r--r--ceilometer/tests/alarm/test_rpc.py86
-rw-r--r--ceilometer/tests/alarm/test_singleton_alarm_svc.py4
-rw-r--r--ceilometer/tests/api/__init__.py5
-rw-r--r--ceilometer/tests/api/v1/test_app.py3
-rw-r--r--ceilometer/tests/api/v2/test_alarm_scenarios.py25
-rw-r--r--ceilometer/tests/api/v2/test_app.py1
-rw-r--r--ceilometer/tests/api/v2/test_post_samples_scenarios.py46
-rw-r--r--ceilometer/tests/objectstore/test_swift_middleware.py3
-rw-r--r--ceilometer/tests/publisher/test_rpc_publisher.py321
-rw-r--r--ceilometer/tests/test_bin.py12
-rw-r--r--ceilometer/tests/test_collector.py65
-rw-r--r--ceilometer/tests/test_middleware.py6
-rw-r--r--ceilometer/tests/test_notification.py115
-rw-r--r--ceilometer/tests/test_plugin.py20
-rw-r--r--ceilometer/volume/notifications.py14
-rw-r--r--etc/ceilometer/ceilometer.conf.sample439
-rw-r--r--openstack-common.conf2
-rw-r--r--requirements.txt2
-rw-r--r--setup.cfg7
-rw-r--r--tools/config/oslo.config.generator.rc3
62 files changed, 929 insertions, 6540 deletions
diff --git a/bin/ceilometer-rpc-zmq-receiver b/bin/ceilometer-rpc-zmq-receiver
deleted file mode 100755
index f6e878ef..00000000
--- a/bin/ceilometer-rpc-zmq-receiver
+++ /dev/null
@@ -1,53 +0,0 @@
-#!/usr/bin/env python
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2011 OpenStack Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import eventlet
-eventlet.monkey_patch()
-
-import contextlib
-import os
-import sys
-
-# If ../ceilometer/__init__.py exists, add ../ to Python search path, so that
-# it will override what happens to be installed in /usr/(local/)lib/python...
-POSSIBLE_TOPDIR = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
- os.pardir,
- os.pardir))
-if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'ceilometer', '__init__.py')):
- sys.path.insert(0, POSSIBLE_TOPDIR)
-
-from oslo.config import cfg
-
-from ceilometer.openstack.common import log as logging
-from ceilometer.openstack.common import rpc
-from ceilometer.openstack.common.rpc import impl_zmq
-
-CONF = cfg.CONF
-CONF.register_opts(rpc.rpc_opts)
-CONF.register_opts(impl_zmq.zmq_opts)
-
-
-def main():
- CONF(sys.argv[1:], project='ceilometer')
- logging.setup("ceilometer")
-
- with contextlib.closing(impl_zmq.ZmqProxy(CONF)) as reactor:
- reactor.consume_in_thread()
- reactor.wait()
-
-if __name__ == '__main__':
- main()
diff --git a/ceilometer/alarm/rpc.py b/ceilometer/alarm/rpc.py
index 7ca4f759..f080fbe9 100644
--- a/ceilometer/alarm/rpc.py
+++ b/ceilometer/alarm/rpc.py
@@ -18,10 +18,10 @@
from oslo.config import cfg
+from ceilometer import messaging
from ceilometer.openstack.common import context
from ceilometer.openstack.common.gettextutils import _ # noqa
from ceilometer.openstack.common import log
-from ceilometer.openstack.common.rpc import proxy as rpc_proxy
from ceilometer.storage import models
OPTS = [
@@ -40,11 +40,11 @@ cfg.CONF.register_opts(OPTS, group='alarm')
LOG = log.getLogger(__name__)
-class RPCAlarmNotifier(rpc_proxy.RpcProxy):
+class RPCAlarmNotifier(object):
def __init__(self):
- super(RPCAlarmNotifier, self).__init__(
- default_version='1.0',
- topic=cfg.CONF.alarm.notifier_rpc_topic)
+ self.client = messaging.get_rpc_client(
+ topic=cfg.CONF.alarm.notifier_rpc_topic,
+ version="1.0")
def notify(self, alarm, previous, reason, reason_data):
actions = getattr(alarm, models.Alarm.ALARM_ACTIONS_MAP[alarm.state])
@@ -56,36 +56,36 @@ class RPCAlarmNotifier(rpc_proxy.RpcProxy):
'previous': previous,
'state': alarm.state})
return
- msg = self.make_msg('notify_alarm', data={
- 'actions': actions,
- 'alarm_id': alarm.alarm_id,
- 'previous': previous,
- 'current': alarm.state,
- 'reason': unicode(reason),
- 'reason_data': reason_data})
- self.cast(context.get_admin_context(), msg)
+ self.client.cast(context.get_admin_context(),
+ 'notify_alarm', data={
+ 'actions': actions,
+ 'alarm_id': alarm.alarm_id,
+ 'previous': previous,
+ 'current': alarm.state,
+ 'reason': unicode(reason),
+ 'reason_data': reason_data})
-class RPCAlarmPartitionCoordination(rpc_proxy.RpcProxy):
+class RPCAlarmPartitionCoordination(object):
def __init__(self):
- super(RPCAlarmPartitionCoordination, self).__init__(
- default_version='1.0',
- topic=cfg.CONF.alarm.partition_rpc_topic)
+ self.client = messaging.get_rpc_client(
+ topic=cfg.CONF.alarm.partition_rpc_topic,
+ version="1.0")
def presence(self, uuid, priority):
- msg = self.make_msg('presence', data={
- 'uuid': uuid,
- 'priority': priority})
- self.fanout_cast(context.get_admin_context(), msg)
+ cctxt = self.client.prepare(fanout=True)
+ return cctxt.cast(context.get_admin_context(),
+ 'presence', data={'uuid': uuid,
+ 'priority': priority})
def assign(self, uuid, alarms):
- msg = self.make_msg('assign', data={
- 'uuid': uuid,
- 'alarms': alarms})
- return self.fanout_cast(context.get_admin_context(), msg)
+ cctxt = self.client.prepare(fanout=True)
+ return cctxt.cast(context.get_admin_context(),
+ 'assign', data={'uuid': uuid,
+ 'alarms': alarms})
def allocate(self, uuid, alarms):
- msg = self.make_msg('allocate', data={
- 'uuid': uuid,
- 'alarms': alarms})
- return self.fanout_cast(context.get_admin_context(), msg)
+ cctxt = self.client.prepare(fanout=True)
+ return cctxt.cast(context.get_admin_context(),
+ 'allocate', data={'uuid': uuid,
+ 'alarms': alarms})
diff --git a/ceilometer/alarm/service.py b/ceilometer/alarm/service.py
index b73eb9e8..c51c2236 100644
--- a/ceilometer/alarm/service.py
+++ b/ceilometer/alarm/service.py
@@ -27,11 +27,10 @@ from stevedore import extension
from ceilometer.alarm.partition import coordination
from ceilometer.alarm import rpc as rpc_alarm
+from ceilometer import messaging
from ceilometer.openstack.common.gettextutils import _ # noqa
from ceilometer.openstack.common import log
from ceilometer.openstack.common import network_utils
-from ceilometer.openstack.common.rpc import dispatcher as rpc_dispatcher
-from ceilometer.openstack.common.rpc import service as rpc_service
from ceilometer.openstack.common import service as os_service
@@ -137,26 +136,17 @@ class SingletonAlarmService(AlarmService, os_service.Service):
cfg.CONF.import_opt('host', 'ceilometer.service')
-class PartitionedAlarmService(AlarmService, rpc_service.Service):
+class PartitionedAlarmService(AlarmService, os_service.Service):
def __init__(self):
- super(PartitionedAlarmService, self).__init__(
- cfg.CONF.host,
- cfg.CONF.alarm.partition_rpc_topic,
- self
- )
+ super(PartitionedAlarmService, self).__init__()
+ self.rpc_server = messaging.get_rpc_server(
+ cfg.CONF.alarm.partition_rpc_topic, self)
+
self._load_evaluators()
self.api_client = None
self.partition_coordinator = coordination.PartitionCoordinator()
- def initialize_service_hook(self, service):
- LOG.debug(_('initialize_service_hooks'))
- self.conn.create_worker(
- cfg.CONF.alarm.partition_rpc_topic,
- rpc_dispatcher.RpcDispatcher([self]),
- 'ceilometer.alarm.' + cfg.CONF.alarm.partition_rpc_topic,
- )
-
def start(self):
super(PartitionedAlarmService, self).start()
if self.evaluators:
@@ -174,9 +164,14 @@ class PartitionedAlarmService(AlarmService, rpc_service.Service):
eval_interval,
self._evaluate_assigned_alarms,
eval_interval)
+ self.rpc_server.start()
# Add a dummy thread to have wait() working
self.tg.add_timer(604800, lambda: None)
+ def stop(self):
+ self.rpc_server.stop()
+ super(AlarmNotifierService, self).stop()
+
def _assigned_alarms(self):
return self.partition_coordinator.assigned_alarms(self._client)
@@ -193,27 +188,26 @@ class PartitionedAlarmService(AlarmService, rpc_service.Service):
data.get('alarms'))
-class AlarmNotifierService(rpc_service.Service):
+class AlarmNotifierService(os_service.Service):
EXTENSIONS_NAMESPACE = "ceilometer.alarm.notifier"
- def __init__(self, host, topic):
- super(AlarmNotifierService, self).__init__(host, topic, self)
+ def __init__(self):
+ super(AlarmNotifierService, self).__init__()
+ self.rpc_server = messaging.get_rpc_server(
+ cfg.CONF.alarm.notifier_rpc_topic, self)
self.notifiers = extension.ExtensionManager(self.EXTENSIONS_NAMESPACE,
invoke_on_load=True)
def start(self):
super(AlarmNotifierService, self).start()
+ self.rpc_server.start()
# Add a dummy thread to have wait() working
self.tg.add_timer(604800, lambda: None)
- def initialize_service_hook(self, service):
- LOG.debug(_('initialize_service_hooks'))
- self.conn.create_worker(
- cfg.CONF.alarm.notifier_rpc_topic,
- rpc_dispatcher.RpcDispatcher([self]),
- 'ceilometer.alarm.' + cfg.CONF.alarm.notifier_rpc_topic,
- )
+ def stop(self):
+ self.rpc_server.stop()
+ super(AlarmNotifierService, self).stop()
def _handle_action(self, action, alarm_id, previous,
current, reason, reason_data):
diff --git a/ceilometer/api/controllers/v2.py b/ceilometer/api/controllers/v2.py
index fd91a4aa..827bc7ce 100644
--- a/ceilometer/api/controllers/v2.py
+++ b/ceilometer/api/controllers/v2.py
@@ -46,10 +46,10 @@ from wsme import types as wtypes
import wsmeext.pecan as wsme_pecan
from ceilometer.api import acl
+from ceilometer import messaging
from ceilometer.openstack.common import context
from ceilometer.openstack.common.gettextutils import _ # noqa
from ceilometer.openstack.common import log
-from ceilometer.openstack.common.notifier import api as notify
from ceilometer.openstack.common import strutils
from ceilometer.openstack.common import timeutils
from ceilometer import sample
@@ -594,8 +594,8 @@ def _make_link(rel_name, url, type, type_arg, query=None):
def _send_notification(event, payload):
notification = event.replace(" ", "_")
notification = "alarm.%s" % notification
- notify.notify(None, notify.publisher_id("ceilometer.api"),
- notification, notify.INFO, payload)
+ notifier = messaging.get_notifier(publisher_id="ceilometer.api")
+ notifier.info(None, notification, payload)
class OldSample(_Base):
diff --git a/ceilometer/cli.py b/ceilometer/cli.py
index d37d30a8..2a24441a 100644
--- a/ceilometer/cli.py
+++ b/ceilometer/cli.py
@@ -24,8 +24,9 @@ import sys
import eventlet
# NOTE(jd) We need to monkey patch the socket and select module for,
-# at least, oslo.rpc, otherwise everything's blocked on its first read()
-# or select()
+# at least, oslo.messaging, otherwise everything's blocked on its
+# first read() or select(), thread need to be patched too, because
+# oslo.messaging use threading.local
eventlet.monkey_patch(socket=True, select=True, thread=True)
@@ -62,8 +63,7 @@ LOG = logging.getLogger(__name__)
def alarm_notifier():
service.prepare_service()
- os_service.launch(alarm_service.AlarmNotifierService(
- cfg.CONF.host, 'ceilometer.alarm')).wait()
+ os_service.launch(alarm_service.AlarmNotifierService()).wait()
def alarm_evaluator():
@@ -86,8 +86,7 @@ def agent_notification():
service.prepare_service()
launcher = os_service.ProcessLauncher()
launcher.launch_service(
- notification.NotificationService(cfg.CONF.host,
- 'ceilometer.agent.notification'),
+ notification.NotificationService(),
workers=service.get_workers('notification'))
launcher.wait()
@@ -102,8 +101,7 @@ def collector_service():
service.prepare_service()
launcher = os_service.ProcessLauncher()
launcher.launch_service(
- collector.CollectorService(cfg.CONF.host,
- 'ceilometer.collector'),
+ collector.CollectorService(),
workers=service.get_workers('collector'))
launcher.wait()
diff --git a/ceilometer/collector.py b/ceilometer/collector.py
index d475b2ad..e89563ac 100644
--- a/ceilometer/collector.py
+++ b/ceilometer/collector.py
@@ -21,10 +21,10 @@ import socket
import msgpack
from oslo.config import cfg
+from ceilometer import messaging
from ceilometer.openstack.common.gettextutils import _ # noqa
from ceilometer.openstack.common import log
-from ceilometer.openstack.common.rpc import dispatcher as rpc_dispatcher
-from ceilometer.openstack.common.rpc import service as rpc_service
+from ceilometer.openstack.common import service as os_service
from ceilometer.openstack.common import units
from ceilometer import service
@@ -39,7 +39,6 @@ OPTS = [
]
cfg.CONF.register_opts(OPTS, group="collector")
-cfg.CONF.import_opt('rpc_backend', 'ceilometer.openstack.common.rpc')
cfg.CONF.import_opt('metering_topic', 'ceilometer.publisher.rpc',
group="publisher_rpc")
@@ -47,15 +46,25 @@ cfg.CONF.import_opt('metering_topic', 'ceilometer.publisher.rpc',
LOG = log.getLogger(__name__)
-class CollectorService(service.DispatchedService, rpc_service.Service):
+class CollectorService(service.DispatchedService, os_service.Service):
"""Listener for the collector service."""
+ @staticmethod
+ def rpc_enabled():
+ # cfg.CONF opt from oslo.messaging.transport
+ return cfg.CONF.rpc_backend or cfg.CONF.transport_url
+
def start(self):
"""Bind the UDP socket and handle incoming data."""
+ super(CollectorService, self).start()
if cfg.CONF.collector.udp_address:
self.tg.add_thread(self.start_udp)
- if cfg.CONF.rpc_backend:
- super(CollectorService, self).start()
+
+ if self.rpc_enabled():
+ self.rpc_server = messaging.get_rpc_server(
+ cfg.CONF.publisher_rpc.metering_topic, self)
+ self.rpc_server.start()
+
if not cfg.CONF.collector.udp_address:
# Add a dummy thread to have wait() working
self.tg.add_timer(604800, lambda: None)
@@ -85,23 +94,14 @@ class CollectorService(service.DispatchedService, rpc_service.Service):
def stop(self):
self.udp_run = False
+ if self.rpc_enabled():
+ self.rpc_server.stop()
super(CollectorService, self).stop()
- def initialize_service_hook(self, service):
- '''Consumers must be declared before consume_thread start.'''
- # Set ourselves up as a separate worker for the metering data,
- # since the default for service is to use create_consumer().
- self.conn.create_worker(
- cfg.CONF.publisher_rpc.metering_topic,
- rpc_dispatcher.RpcDispatcher([self]),
- 'ceilometer.collector.' + cfg.CONF.publisher_rpc.metering_topic,
- )
-
def record_metering_data(self, context, data):
"""RPC endpoint for messages we send to ourselves.
When the notification messages are re-published through the
RPC publisher, this method receives them for processing.
"""
- self.dispatcher_manager.map_method('record_metering_data',
- data=data)
+ self.dispatcher_manager.map_method('record_metering_data', data=data)
diff --git a/ceilometer/compute/notifications/__init__.py b/ceilometer/compute/notifications/__init__.py
index e3c11910..6f9b6a08 100644
--- a/ceilometer/compute/notifications/__init__.py
+++ b/ceilometer/compute/notifications/__init__.py
@@ -17,6 +17,7 @@
# under the License.
from oslo.config import cfg
+import oslo.messaging
from ceilometer import plugin
@@ -33,13 +34,10 @@ cfg.CONF.register_opts(OPTS)
class ComputeNotificationBase(plugin.NotificationBase):
@staticmethod
- def get_exchange_topics(conf):
- """Return a sequence of ExchangeTopics defining the exchange and
+ def get_targets(conf):
+ """Return a sequence of oslo.messaging.Target defining the exchange and
topics to be connected for this plugin.
"""
- return [
- plugin.ExchangeTopics(
- exchange=conf.nova_control_exchange,
- topics=set(topic + ".info"
- for topic in conf.notification_topics)),
- ]
+ return [oslo.messaging.Target(topic=topic,
+ exchange=conf.nova_control_exchange)
+ for topic in conf.notification_topics]
diff --git a/ceilometer/image/notifications.py b/ceilometer/image/notifications.py
index d87ed3a8..db7898cc 100644
--- a/ceilometer/image/notifications.py
+++ b/ceilometer/image/notifications.py
@@ -20,6 +20,7 @@
"""
from oslo.config import cfg
+import oslo.messaging
from ceilometer import plugin
from ceilometer import sample
@@ -38,16 +39,13 @@ class ImageBase(plugin.NotificationBase):
"""Base class for image counting."""
@staticmethod
- def get_exchange_topics(conf):
- """Return a sequence of ExchangeTopics defining the exchange and
+ def get_targets(conf):
+ """Return a sequence of oslo.messaging.Target defining the exchange and
topics to be connected for this plugin.
"""
- return [
- plugin.ExchangeTopics(
- exchange=conf.glance_control_exchange,
- topics=set(topic + ".info"
- for topic in conf.notification_topics)),
- ]
+ return [oslo.messaging.Target(topic=topic,
+ exchange=conf.glance_control_exchange)
+ for topic in conf.notification_topics]
class ImageCRUDBase(ImageBase):
diff --git a/ceilometer/messaging.py b/ceilometer/messaging.py
new file mode 100644
index 00000000..4b66e3eb
--- /dev/null
+++ b/ceilometer/messaging.py
@@ -0,0 +1,91 @@
+# -*- encoding: utf-8 -*-
+# Copyright © 2013 eNovance <licensing@enovance.com>
+#
+# Author: Mehdi Abaakouk <mehdi.abaakouk@enovance.com>
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from oslo.config import cfg
+import oslo.messaging
+
+TRANSPORT = None
+NOTIFIER = None
+
+_ALIASES = {
+ 'ceilometer.openstack.common.rpc.impl_kombu': 'rabbit',
+ 'ceilometer.openstack.common.rpc.impl_qpid': 'qpid',
+ 'ceilometer.openstack.common.rpc.impl_zmq': 'zmq',
+}
+
+
+def setup(url=None):
+ """Initialise the oslo.messaging layer."""
+ global TRANSPORT, NOTIFIER
+ if not TRANSPORT:
+ oslo.messaging.set_transport_defaults('ceilometer')
+ TRANSPORT = oslo.messaging.get_transport(cfg.CONF, url,
+ aliases=_ALIASES)
+ if not NOTIFIER:
+ NOTIFIER = oslo.messaging.Notifier(TRANSPORT)
+
+
+def cleanup():
+ """Cleanup the oslo.messaging layer."""
+ global TRANSPORT, NOTIFIER
+ assert TRANSPORT is not None
+ assert NOTIFIER is not None
+ TRANSPORT.cleanup()
+ TRANSPORT = NOTIFIER = None
+
+
+def get_rpc_server(topic, endpoint):
+ """Return a configured oslo.messaging rpc server."""
+ global TRANSPORT
+ target = oslo.messaging.Target(server=cfg.CONF.host, topic=topic)
+ return oslo.messaging.get_rpc_server(TRANSPORT, target, [endpoint],
+ executor='eventlet')
+
+
+def get_rpc_client(**kwargs):
+ """Return a configured oslo.messaging RPCClient."""
+ global TRANSPORT
+ target = oslo.messaging.Target(**kwargs)
+ return oslo.messaging.RPCClient(TRANSPORT, target)
+
+
+def get_notification_listener(targets, endpoint):
+ """Return a configured oslo.messaging notification listener."""
+ global TRANSPORT
+ return oslo.messaging.get_notification_listener(
+ TRANSPORT, targets, [endpoint], executor='eventlet')
+
+
+def get_notifier(publisher_id):
+ """Return a configured oslo.messaging notifier."""
+ global NOTIFIER
+ return NOTIFIER.prepare(publisher_id=publisher_id)
+
+
+def convert_to_old_notification_format(priority, ctxt, publisher_id,
+ event_type, payload, metadata):
+ #FIXME(sileht): temporary convert notification to old format
+ #to focus on oslo.messaging migration before refactoring the code to
+ #use the new oslo.messaging facilities
+ notification = {'priority': priority,
+ 'payload': payload,
+ 'event_type': event_type,
+ 'publisher_id': publisher_id}
+ notification.update(metadata)
+ for k in ctxt:
+ notification['_context_' + k] = ctxt[k]
+ return notification
diff --git a/ceilometer/middleware.py b/ceilometer/middleware.py
index e84f5258..3232f684 100644
--- a/ceilometer/middleware.py
+++ b/ceilometer/middleware.py
@@ -17,6 +17,7 @@
# under the License.
from oslo.config import cfg
+import oslo.messaging
from ceilometer import plugin
from ceilometer import sample
@@ -46,17 +47,13 @@ class HTTPRequest(plugin.NotificationBase):
event_types = ['http.request']
@staticmethod
- def get_exchange_topics(conf):
- """Return a sequence of ExchangeTopics defining the exchange and
+ def get_targets(conf):
+ """Return a sequence of oslo.messaging.Target defining the exchange and
topics to be connected for this plugin.
"""
- return [
- plugin.ExchangeTopics(
- exchange=exchange,
- topics=set(topic + ".info"
- for topic in conf.notification_topics))
- for exchange in conf.http_control_exchanges
- ]
+ return [oslo.messaging.Target(topic=topic, exchange=exchange)
+ for topic in conf.notification_topics
+ for exchange in conf.http_control_exchanges]
def process_notification(self, message):
yield sample.Sample.from_notification(
diff --git a/ceilometer/network/notifications.py b/ceilometer/network/notifications.py
index 9bdc1dbf..60a14c28 100644
--- a/ceilometer/network/notifications.py
+++ b/ceilometer/network/notifications.py
@@ -21,6 +21,7 @@
"""
from oslo.config import cfg
+import oslo.messaging
from ceilometer.openstack.common.gettextutils import _ # noqa
from ceilometer.openstack.common import log
@@ -63,17 +64,13 @@ class NetworkNotificationBase(plugin.NotificationBase):
]
@staticmethod
- def get_exchange_topics(conf):
- """Return a sequence of ExchangeTopics defining the exchange and topics
- to be connected for this plugin.
-
+ def get_targets(conf):
+ """Return a sequence of oslo.messaging.Target defining the exchange and
+ topics to be connected for this plugin.
"""
- return [
- plugin.ExchangeTopics(
- exchange=conf.neutron_control_exchange,
- topics=set(topic + ".info"
- for topic in conf.notification_topics)),
- ]
+ return [oslo.messaging.Target(topic=topic,
+ exchange=conf.neutron_control_exchange)
+ for topic in conf.notification_topics]
def process_notification(self, message):
LOG.info(_('network notification %r') % message)
diff --git a/ceilometer/notification.py b/ceilometer/notification.py
index bde5d6df..95abebf5 100644
--- a/ceilometer/notification.py
+++ b/ceilometer/notification.py
@@ -17,13 +17,15 @@
# under the License.
from oslo.config import cfg
+import oslo.messaging
from stevedore import extension
from ceilometer.event import converter as event_converter
+from ceilometer import messaging
from ceilometer.openstack.common import context
from ceilometer.openstack.common.gettextutils import _ # noqa
from ceilometer.openstack.common import log
-from ceilometer.openstack.common.rpc import service as rpc_service
+from ceilometer.openstack.common import service as os_service
from ceilometer import pipeline
from ceilometer import service
from ceilometer.storage import models
@@ -46,77 +48,54 @@ OPTS = [
cfg.CONF.register_opts(OPTS, group="notification")
-class UnableToSaveEventException(Exception):
- """Thrown when we want to requeue an event.
-
- Any exception is fine, but this one should make debugging
- a little easier.
- """
-
-
-class NotificationService(service.DispatchedService, rpc_service.Service):
+class NotificationService(service.DispatchedService, os_service.Service):
NOTIFICATION_NAMESPACE = 'ceilometer.notification'
def start(self):
super(NotificationService, self).start()
- # Add a dummy thread to have wait() working
- self.tg.add_timer(604800, lambda: None)
-
- def initialize_service_hook(self, service):
- '''Consumers must be declared before consume_thread start.'''
self.pipeline_manager = pipeline.setup_pipeline()
- LOG.debug(_('Loading event definitions'))
- self.event_converter = event_converter.setup_events(
- extension.ExtensionManager(
- namespace='ceilometer.event.trait_plugin'))
-
- self.notification_manager = \
- extension.ExtensionManager(
- namespace=self.NOTIFICATION_NAMESPACE,
- invoke_on_load=True,
- )
+ self.notification_manager = extension.ExtensionManager(
+ namespace=self.NOTIFICATION_NAMESPACE,
+ invoke_on_load=True,
+ )
if not list(self.notification_manager):
LOG.warning(_('Failed to load any notification handlers for %s'),
self.NOTIFICATION_NAMESPACE)
- self.notification_manager.map(self._setup_subscription)
- def _setup_subscription(self, ext, *args, **kwds):
- """Connect to message bus to get notifications
+ ack_on_error = cfg.CONF.notification.ack_on_event_error
- Configure the RPC connection to listen for messages on the
- right exchanges and topics so we receive all of the
- notifications.
+ targets = []
+ for ext in self.notification_manager:
+ handler = ext.obj
+ LOG.debug(_('Event types from %(name)s: %(type)s'
+ ' (ack_on_error=%(error)s)') %
+ {'name': ext.name,
+ 'type': ', '.join(handler.event_types),
+ 'error': ack_on_error})
+ targets.extend(handler.get_targets(cfg.CONF))
- Use a connection pool so that multiple notification agent instances
- can run in parallel to share load and without competing with each
- other for incoming messages.
+ self.listener = messaging.get_notification_listener(targets, self)
- """
- handler = ext.obj
- ack_on_error = cfg.CONF.notification.ack_on_event_error
- LOG.debug(_('Event types from %(name)s: %(type)s'
- ' (ack_on_error=%(error)s)') %
- {'name': ext.name,
- 'type': ', '.join(handler.event_types),
- 'error': ack_on_error})
-
- for exchange_topic in handler.get_exchange_topics(cfg.CONF):
- for topic in exchange_topic.topics:
- try:
- self.conn.join_consumer_pool(
- callback=self.process_notification,
- pool_name=topic,
- topic=topic,
- exchange_name=exchange_topic.exchange,
- ack_on_error=ack_on_error)
- except Exception:
- LOG.exception(_('Could not join consumer pool'
- ' %(topic)s/%(exchange)s') %
- {'topic': topic,
- 'exchange': exchange_topic.exchange})
+ LOG.debug(_('Loading event definitions'))
+ self.event_converter = event_converter.setup_events(
+ extension.ExtensionManager(
+ namespace='ceilometer.event.trait_plugin'))
+
+ self.listener.start()
+ # Add a dummy thread to have wait() working
+ self.tg.add_timer(604800, lambda: None)
+
+ def stop(self):
+ self.listener.stop()
+ super(NotificationService, self).stop()
+
+ def info(self, ctxt, publisher_id, event_type, payload, metadata):
+ notification = messaging.convert_to_old_notification_format(
+ 'info', ctxt, publisher_id, event_type, payload, metadata)
+ self.process_notification(notification)
def process_notification(self, notification):
"""RPC endpoint for notification messages
@@ -130,7 +109,9 @@ class NotificationService(service.DispatchedService, rpc_service.Service):
notification=notification)
if cfg.CONF.notification.store_events:
- self._message_to_event(notification)
+ return self._message_to_event(notification)
+ else:
+ return oslo.messaging.NotificationResult.HANDLED
def _message_to_event(self, body):
"""Convert message to Ceilometer Event.
@@ -138,6 +119,7 @@ class NotificationService(service.DispatchedService, rpc_service.Service):
NOTE: the rpc layer currently rips out the notification
delivery_info, which is critical to determining the
source of the notification. This will have to get added back later.
+
"""
event = self.event_converter.to_event(body)
@@ -147,9 +129,9 @@ class NotificationService(service.DispatchedService, rpc_service.Service):
for dispatcher in self.dispatcher_manager:
problem_events.extend(dispatcher.obj.record_events(event))
if models.Event.UNKNOWN_PROBLEM in [x[0] for x in problem_events]:
- # Don't ack the message, raise to requeue it
- # if ack_on_error = False
- raise UnableToSaveEventException()
+ if not cfg.CONF.notification.ack_on_event_error:
+ return oslo.messaging.NotificationResult.REQUEUE
+ return oslo.messaging.NotificationResult.HANDLED
def _process_notification_for_ext(self, ext, notification):
"""Wrapper for calling pipelines when a notification arrives
diff --git a/ceilometer/openstack/common/notifier/__init__.py b/ceilometer/openstack/common/notifier/__init__.py
deleted file mode 100644
index e69de29b..00000000
--- a/ceilometer/openstack/common/notifier/__init__.py
+++ /dev/null
diff --git a/ceilometer/openstack/common/notifier/api.py b/ceilometer/openstack/common/notifier/api.py
deleted file mode 100644
index 021c3ac8..00000000
--- a/ceilometer/openstack/common/notifier/api.py
+++ /dev/null
@@ -1,173 +0,0 @@
-# Copyright 2011 OpenStack Foundation.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import socket
-import uuid
-
-from oslo.config import cfg
-
-from ceilometer.openstack.common import context
-from ceilometer.openstack.common.gettextutils import _, _LE
-from ceilometer.openstack.common import importutils
-from ceilometer.openstack.common import jsonutils
-from ceilometer.openstack.common import log as logging
-from ceilometer.openstack.common import timeutils
-
-
-LOG = logging.getLogger(__name__)
-
-notifier_opts = [
- cfg.MultiStrOpt('notification_driver',
- default=[],
- help='Driver or drivers to handle sending notifications'),
- cfg.StrOpt('default_notification_level',
- default='INFO',
- help='Default notification level for outgoing notifications'),
- cfg.StrOpt('default_publisher_id',
- default=None,
- help='Default publisher_id for outgoing notifications'),
-]
-
-CONF = cfg.CONF
-CONF.register_opts(notifier_opts)
-
-WARN = 'WARN'
-INFO = 'INFO'
-ERROR = 'ERROR'
-CRITICAL = 'CRITICAL'
-DEBUG = 'DEBUG'
-
-log_levels = (DEBUG, WARN, INFO, ERROR, CRITICAL)
-
-
-class BadPriorityException(Exception):
- pass
-
-
-def notify_decorator(name, fn):
- """Decorator for notify which is used from utils.monkey_patch().
-
- :param name: name of the function
- :param function: - object of the function
- :returns: function -- decorated function
-
- """
- def wrapped_func(*args, **kwarg):
- body = {}
- body['args'] = []
- body['kwarg'] = {}
- for arg in args:
- body['args'].append(arg)
- for key in kwarg:
- body['kwarg'][key] = kwarg[key]
-
- ctxt = context.get_context_from_function_and_args(fn, args, kwarg)
- notify(ctxt,
- CONF.default_publisher_id or socket.gethostname(),
- name,
- CONF.default_notification_level,
- body)
- return fn(*args, **kwarg)
- return wrapped_func
-
-
-def publisher_id(service, host=None):
- if not host:
- try:
- host = CONF.host
- except AttributeError:
- host = CONF.default_publisher_id or socket.gethostname()
- return "%s.%s" % (service, host)
-
-
-def notify(context, publisher_id, event_type, priority, payload):
- """Sends a notification using the specified driver
-
- :param publisher_id: the source worker_type.host of the message
- :param event_type: the literal type of event (ex. Instance Creation)
- :param priority: patterned after the enumeration of Python logging
- levels in the set (DEBUG, WARN, INFO, ERROR, CRITICAL)
- :param payload: A python dictionary of attributes
-
- Outgoing message format includes the above parameters, and appends the
- following:
-
- message_id
- a UUID representing the id for this notification
-
- timestamp
- the GMT timestamp the notification was sent at
-
- The composite message will be constructed as a dictionary of the above
- attributes, which will then be sent via the transport mechanism defined
- by the driver.
-
- Message example::
-
- {'message_id': str(uuid.uuid4()),
- 'publisher_id': 'compute.host1',
- 'timestamp': timeutils.utcnow(),
- 'priority': 'WARN',
- 'event_type': 'compute.create_instance',
- 'payload': {'instance_id': 12, ... }}
-
- """
- if priority not in log_levels:
- raise BadPriorityException(
- _('%s not in valid priorities') % priority)
-
- # Ensure everything is JSON serializable.
- payload = jsonutils.to_primitive(payload, convert_instances=True)
-
- msg = dict(message_id=str(uuid.uuid4()),
- publisher_id=publisher_id,
- event_type=event_type,
- priority=priority,
- payload=payload,
- timestamp=str(timeutils.utcnow()))
-
- for driver in _get_drivers():
- try:
- driver.notify(context, msg)
- except Exception as e:
- LOG.exception(_LE("Problem '%(e)s' attempting to "
- "send to notification system. "
- "Payload=%(payload)s")
- % dict(e=e, payload=payload))
-
-
-_drivers = None
-
-
-def _get_drivers():
- """Instantiate, cache, and return drivers based on the CONF."""
- global _drivers
- if _drivers is None:
- _drivers = {}
- for notification_driver in CONF.notification_driver:
- try:
- driver = importutils.import_module(notification_driver)
- _drivers[notification_driver] = driver
- except ImportError:
- LOG.exception(_LE("Failed to load notifier %s. "
- "These notifications will not be sent.") %
- notification_driver)
- return _drivers.values()
-
-
-def _reset_drivers():
- """Used by unit tests to reset the drivers."""
- global _drivers
- _drivers = None
diff --git a/ceilometer/openstack/common/notifier/log_notifier.py b/ceilometer/openstack/common/notifier/log_notifier.py
deleted file mode 100644
index 99955a14..00000000
--- a/ceilometer/openstack/common/notifier/log_notifier.py
+++ /dev/null
@@ -1,37 +0,0 @@
-# Copyright 2011 OpenStack Foundation.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from oslo.config import cfg
-
-from ceilometer.openstack.common import jsonutils
-from ceilometer.openstack.common import log as logging
-
-
-CONF = cfg.CONF
-
-
-def notify(_context, message):
- """Notifies the recipient of the desired event given the model.
-
- Log notifications using OpenStack's default logging system.
- """
-
- priority = message.get('priority',
- CONF.default_notification_level)
- priority = priority.lower()
- logger = logging.getLogger(
- 'ceilometer.openstack.common.notification.%s' %
- message['event_type'])
- getattr(logger, priority)(jsonutils.dumps(message))
diff --git a/ceilometer/openstack/common/notifier/no_op_notifier.py b/ceilometer/openstack/common/notifier/no_op_notifier.py
deleted file mode 100644
index 13d946e3..00000000
--- a/ceilometer/openstack/common/notifier/no_op_notifier.py
+++ /dev/null
@@ -1,19 +0,0 @@
-# Copyright 2011 OpenStack Foundation.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-
-def notify(_context, message):
- """Notifies the recipient of the desired event given the model."""
- pass
diff --git a/ceilometer/openstack/common/notifier/proxy.py b/ceilometer/openstack/common/notifier/proxy.py
deleted file mode 100644
index 79035cee..00000000
--- a/ceilometer/openstack/common/notifier/proxy.py
+++ /dev/null
@@ -1,77 +0,0 @@
-# Copyright 2013 Red Hat, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-"""
-A temporary helper which emulates oslo.messaging.Notifier.
-
-This helper method allows us to do the tedious porting to the new Notifier API
-as a standalone commit so that the commit which switches us to oslo.messaging
-is smaller and easier to review. This file will be removed as part of that
-commit.
-"""
-
-from oslo.config import cfg
-
-from ceilometer.openstack.common.notifier import api as notifier_api
-
-CONF = cfg.CONF
-
-
-class Notifier(object):
-
- def __init__(self, publisher_id):
- super(Notifier, self).__init__()
- self.publisher_id = publisher_id
-
- _marker = object()
-
- def prepare(self, publisher_id=_marker):
- ret = self.__class__(self.publisher_id)
- if publisher_id is not self._marker:
- ret.publisher_id = publisher_id
- return ret
-
- def _notify(self, ctxt, event_type, payload, priority):
- notifier_api.notify(ctxt,
- self.publisher_id,
- event_type,
- priority,
- payload)
-
- def audit(self, ctxt, event_type, payload):
- # No audit in old notifier.
- self._notify(ctxt, event_type, payload, 'INFO')
-
- def debug(self, ctxt, event_type, payload):
- self._notify(ctxt, event_type, payload, 'DEBUG')
-
- def info(self, ctxt, event_type, payload):
- self._notify(ctxt, event_type, payload, 'INFO')
-
- def warn(self, ctxt, event_type, payload):
- self._notify(ctxt, event_type, payload, 'WARN')
-
- warning = warn
-
- def error(self, ctxt, event_type, payload):
- self._notify(ctxt, event_type, payload, 'ERROR')
-
- def critical(self, ctxt, event_type, payload):
- self._notify(ctxt, event_type, payload, 'CRITICAL')
-
-
-def get_notifier(service=None, host=None, publisher_id=None):
- if not publisher_id:
- publisher_id = "%s.%s" % (service, host or CONF.host)
- return Notifier(publisher_id)
diff --git a/ceilometer/openstack/common/notifier/rpc_notifier.py b/ceilometer/openstack/common/notifier/rpc_notifier.py
deleted file mode 100644
index d2b8bdf4..00000000
--- a/ceilometer/openstack/common/notifier/rpc_notifier.py
+++ /dev/null
@@ -1,47 +0,0 @@
-# Copyright 2011 OpenStack Foundation.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from oslo.config import cfg
-
-from ceilometer.openstack.common import context as req_context
-from ceilometer.openstack.common.gettextutils import _LE
-from ceilometer.openstack.common import log as logging
-from ceilometer.openstack.common import rpc
-
-LOG = logging.getLogger(__name__)
-
-notification_topic_opt = cfg.ListOpt(
- 'notification_topics', default=['notifications', ],
- help='AMQP topic used for OpenStack notifications')
-
-CONF = cfg.CONF
-CONF.register_opt(notification_topic_opt)
-
-
-def notify(context, message):
- """Sends a notification via RPC."""
- if not context:
- context = req_context.get_admin_context()
- priority = message.get('priority',
- CONF.default_notification_level)
- priority = priority.lower()
- for topic in CONF.notification_topics:
- topic = '%s.%s' % (topic, priority)
- try:
- rpc.notify(context, topic, message)
- except Exception:
- LOG.exception(_LE("Could not send notification to %(topic)s. "
- "Payload=%(message)s"),
- {"topic": topic, "message": message})
diff --git a/ceilometer/openstack/common/notifier/rpc_notifier2.py b/ceilometer/openstack/common/notifier/rpc_notifier2.py
deleted file mode 100644
index 626f90d7..00000000
--- a/ceilometer/openstack/common/notifier/rpc_notifier2.py
+++ /dev/null
@@ -1,53 +0,0 @@
-# Copyright 2011 OpenStack Foundation.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-'''messaging based notification driver, with message envelopes'''
-
-from oslo.config import cfg
-
-from ceilometer.openstack.common import context as req_context
-from ceilometer.openstack.common.gettextutils import _LE
-from ceilometer.openstack.common import log as logging
-from ceilometer.openstack.common import rpc
-
-LOG = logging.getLogger(__name__)
-
-notification_topic_opt = cfg.ListOpt(
- 'topics', default=['notifications', ],
- help='AMQP topic(s) used for OpenStack notifications')
-
-opt_group = cfg.OptGroup(name='rpc_notifier2',
- title='Options for rpc_notifier2')
-
-CONF = cfg.CONF
-CONF.register_group(opt_group)
-CONF.register_opt(notification_topic_opt, opt_group)
-
-
-def notify(context, message):
- """Sends a notification via RPC."""
- if not context:
- context = req_context.get_admin_context()
- priority = message.get('priority',
- CONF.default_notification_level)
- priority = priority.lower()
- for topic in CONF.rpc_notifier2.topics:
- topic = '%s.%s' % (topic, priority)
- try:
- rpc.notify(context, topic, message, envelope=True)
- except Exception:
- LOG.exception(_LE("Could not send notification to %(topic)s. "
- "Payload=%(message)s"),
- {"topic": topic, "message": message})
diff --git a/ceilometer/openstack/common/notifier/test_notifier.py b/ceilometer/openstack/common/notifier/test_notifier.py
deleted file mode 100644
index 11fc21fc..00000000
--- a/ceilometer/openstack/common/notifier/test_notifier.py
+++ /dev/null
@@ -1,21 +0,0 @@
-# Copyright 2011 OpenStack Foundation.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-NOTIFICATIONS = []
-
-
-def notify(_context, message):
- """Test notifier, stores notifications in memory for unittests."""
- NOTIFICATIONS.append(message)
diff --git a/ceilometer/openstack/common/rpc/__init__.py b/ceilometer/openstack/common/rpc/__init__.py
deleted file mode 100644
index 6c680837..00000000
--- a/ceilometer/openstack/common/rpc/__init__.py
+++ /dev/null
@@ -1,275 +0,0 @@
-# Copyright 2010 United States Government as represented by the
-# Administrator of the National Aeronautics and Space Administration.
-# All Rights Reserved.
-# Copyright 2011 Red Hat, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-"""
-A remote procedure call (rpc) abstraction.
-
-For some wrappers that add message versioning to rpc, see:
- rpc.dispatcher
- rpc.proxy
-"""
-
-from oslo.config import cfg
-
-from ceilometer.openstack.common import importutils
-from ceilometer.openstack.common import log as logging
-
-
-LOG = logging.getLogger(__name__)
-
-
-rpc_opts = [
- cfg.StrOpt('rpc_backend',
- default='%s.impl_kombu' % __package__,
- help="The messaging module to use, defaults to kombu."),
- cfg.IntOpt('rpc_thread_pool_size',
- default=64,
- help='Size of RPC thread pool'),
- cfg.IntOpt('rpc_conn_pool_size',
- default=30,
- help='Size of RPC connection pool'),
- cfg.IntOpt('rpc_response_timeout',
- default=60,
- help='Seconds to wait for a response from call or multicall'),
- cfg.IntOpt('rpc_cast_timeout',
- default=30,
- help='Seconds to wait before a cast expires (TTL). '
- 'Only supported by impl_zmq.'),
- cfg.ListOpt('allowed_rpc_exception_modules',
- default=['nova.exception',
- 'cinder.exception',
- 'exceptions',
- ],
- help='Modules of exceptions that are permitted to be recreated'
- ' upon receiving exception data from an rpc call.'),
- cfg.BoolOpt('fake_rabbit',
- default=False,
- help='If passed, use a fake RabbitMQ provider'),
- cfg.StrOpt('control_exchange',
- default='openstack',
- help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
-]
-
-CONF = cfg.CONF
-CONF.register_opts(rpc_opts)
-
-
-def set_defaults(control_exchange):
- cfg.set_defaults(rpc_opts,
- control_exchange=control_exchange)
-
-
-def create_connection(new=True):
- """Create a connection to the message bus used for rpc.
-
- For some example usage of creating a connection and some consumers on that
- connection, see nova.service.
-
- :param new: Whether or not to create a new connection. A new connection
- will be created by default. If new is False, the
- implementation is free to return an existing connection from a
- pool.
-
- :returns: An instance of openstack.common.rpc.common.Connection
- """
- return _get_impl().create_connection(CONF, new=new)
-
-
-def call(context, topic, msg, timeout=None):
- """Invoke a remote method that returns something.
-
- :param context: Information that identifies the user that has made this
- request.
- :param topic: The topic to send the rpc message to. This correlates to the
- topic argument of
- openstack.common.rpc.common.Connection.create_consumer()
- and only applies when the consumer was created with
- fanout=False.
- :param msg: This is a dict in the form { "method" : "method_to_invoke",
- "args" : dict_of_kwargs }
- :param timeout: int, number of seconds to use for a response timeout.
- If set, this overrides the rpc_response_timeout option.
-
- :returns: A dict from the remote method.
-
- :raises: openstack.common.rpc.common.Timeout if a complete response
- is not received before the timeout is reached.
- """
- return _get_impl().call(CONF, context, topic, msg, timeout)
-
-
-def cast(context, topic, msg):
- """Invoke a remote method that does not return anything.
-
- :param context: Information that identifies the user that has made this
- request.
- :param topic: The topic to send the rpc message to. This correlates to the
- topic argument of
- openstack.common.rpc.common.Connection.create_consumer()
- and only applies when the consumer was created with
- fanout=False.
- :param msg: This is a dict in the form { "method" : "method_to_invoke",
- "args" : dict_of_kwargs }
-
- :returns: None
- """
- return _get_impl().cast(CONF, context, topic, msg)
-
-
-def fanout_cast(context, topic, msg):
- """Broadcast a remote method invocation with no return.
-
- This method will get invoked on all consumers that were set up with this
- topic name and fanout=True.
-
- :param context: Information that identifies the user that has made this
- request.
- :param topic: The topic to send the rpc message to. This correlates to the
- topic argument of
- openstack.common.rpc.common.Connection.create_consumer()
- and only applies when the consumer was created with
- fanout=True.
- :param msg: This is a dict in the form { "method" : "method_to_invoke",
- "args" : dict_of_kwargs }
-
- :returns: None
- """
- return _get_impl().fanout_cast(CONF, context, topic, msg)
-
-
-def multicall(context, topic, msg, timeout=None):
- """Invoke a remote method and get back an iterator.
-
- In this case, the remote method will be returning multiple values in
- separate messages, so the return values can be processed as the come in via
- an iterator.
-
- :param context: Information that identifies the user that has made this
- request.
- :param topic: The topic to send the rpc message to. This correlates to the
- topic argument of
- openstack.common.rpc.common.Connection.create_consumer()
- and only applies when the consumer was created with
- fanout=False.
- :param msg: This is a dict in the form { "method" : "method_to_invoke",
- "args" : dict_of_kwargs }
- :param timeout: int, number of seconds to use for a response timeout.
- If set, this overrides the rpc_response_timeout option.
-
- :returns: An iterator. The iterator will yield a tuple (N, X) where N is
- an index that starts at 0 and increases by one for each value
- returned and X is the Nth value that was returned by the remote
- method.
-
- :raises: openstack.common.rpc.common.Timeout if a complete response
- is not received before the timeout is reached.
- """
- return _get_impl().multicall(CONF, context, topic, msg, timeout)
-
-
-def notify(context, topic, msg, envelope=False):
- """Send notification event.
-
- :param context: Information that identifies the user that has made this
- request.
- :param topic: The topic to send the notification to.
- :param msg: This is a dict of content of event.
- :param envelope: Set to True to enable message envelope for notifications.
-
- :returns: None
- """
- return _get_impl().notify(cfg.CONF, context, topic, msg, envelope)
-
-
-def cleanup():
- """Clean up resources in use by implementation.
-
- Clean up any resources that have been allocated by the RPC implementation.
- This is typically open connections to a messaging service. This function
- would get called before an application using this API exits to allow
- connections to get torn down cleanly.
-
- :returns: None
- """
- return _get_impl().cleanup()
-
-
-def cast_to_server(context, server_params, topic, msg):
- """Invoke a remote method that does not return anything.
-
- :param context: Information that identifies the user that has made this
- request.
- :param server_params: Connection information
- :param topic: The topic to send the notification to.
- :param msg: This is a dict in the form { "method" : "method_to_invoke",
- "args" : dict_of_kwargs }
-
- :returns: None
- """
- return _get_impl().cast_to_server(CONF, context, server_params, topic,
- msg)
-
-
-def fanout_cast_to_server(context, server_params, topic, msg):
- """Broadcast to a remote method invocation with no return.
-
- :param context: Information that identifies the user that has made this
- request.
- :param server_params: Connection information
- :param topic: The topic to send the notification to.
- :param msg: This is a dict in the form { "method" : "method_to_invoke",
- "args" : dict_of_kwargs }
-
- :returns: None
- """
- return _get_impl().fanout_cast_to_server(CONF, context, server_params,
- topic, msg)
-
-
-def queue_get_for(context, topic, host):
- """Get a queue name for a given topic + host.
-
- This function only works if this naming convention is followed on the
- consumer side, as well. For example, in nova, every instance of the
- nova-foo service calls create_consumer() for two topics:
-
- foo
- foo.<host>
-
- Messages sent to the 'foo' topic are distributed to exactly one instance of
- the nova-foo service. The services are chosen in a round-robin fashion.
- Messages sent to the 'foo.<host>' topic are sent to the nova-foo service on
- <host>.
- """
- return '%s.%s' % (topic, host) if host else topic
-
-
-_RPCIMPL = None
-
-
-def _get_impl():
- """Delay import of rpc_backend until configuration is loaded."""
- global _RPCIMPL
- if _RPCIMPL is None:
- try:
- _RPCIMPL = importutils.import_module(CONF.rpc_backend)
- except ImportError:
- # For backwards compatibility with older nova config.
- impl = CONF.rpc_backend.replace('nova.rpc',
- 'nova.openstack.common.rpc')
- _RPCIMPL = importutils.import_module(impl)
- return _RPCIMPL
diff --git a/ceilometer/openstack/common/rpc/amqp.py b/ceilometer/openstack/common/rpc/amqp.py
deleted file mode 100644
index 36c555bf..00000000
--- a/ceilometer/openstack/common/rpc/amqp.py
+++ /dev/null
@@ -1,637 +0,0 @@
-# Copyright 2010 United States Government as represented by the
-# Administrator of the National Aeronautics and Space Administration.
-# All Rights Reserved.
-# Copyright 2011 - 2012, Red Hat, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-"""
-Shared code between AMQP based openstack.common.rpc implementations.
-
-The code in this module is shared between the rpc implementations based on
-AMQP. Specifically, this includes impl_kombu and impl_qpid. impl_carrot also
-uses AMQP, but is deprecated and predates this code.
-"""
-
-import collections
-import inspect
-import sys
-import uuid
-
-from eventlet import greenpool
-from eventlet import pools
-from eventlet import queue
-from eventlet import semaphore
-from oslo.config import cfg
-import six
-
-
-from ceilometer.openstack.common import excutils
-from ceilometer.openstack.common.gettextutils import _, _LE
-from ceilometer.openstack.common import local
-from ceilometer.openstack.common import log as logging
-from ceilometer.openstack.common.rpc import common as rpc_common
-
-
-amqp_opts = [
- cfg.BoolOpt('amqp_durable_queues',
- default=False,
- deprecated_name='rabbit_durable_queues',
- deprecated_group='DEFAULT',
- help='Use durable queues in amqp.'),
- cfg.BoolOpt('amqp_auto_delete',
- default=False,
- help='Auto-delete queues in amqp.'),
-]
-
-cfg.CONF.register_opts(amqp_opts)
-
-UNIQUE_ID = '_unique_id'
-LOG = logging.getLogger(__name__)
-
-
-class Pool(pools.Pool):
- """Class that implements a Pool of Connections."""
- def __init__(self, conf, connection_cls, *args, **kwargs):
- self.connection_cls = connection_cls
- self.conf = conf
- kwargs.setdefault("max_size", self.conf.rpc_conn_pool_size)
- kwargs.setdefault("order_as_stack", True)
- super(Pool, self).__init__(*args, **kwargs)
- self.reply_proxy = None
-
- # TODO(comstud): Timeout connections not used in a while
- def create(self):
- LOG.debug('Pool creating new connection')
- return self.connection_cls(self.conf)
-
- def empty(self):
- while self.free_items:
- self.get().close()
- # Force a new connection pool to be created.
- # Note that this was added due to failing unit test cases. The issue
- # is the above "while loop" gets all the cached connections from the
- # pool and closes them, but never returns them to the pool, a pool
- # leak. The unit tests hang waiting for an item to be returned to the
- # pool. The unit tests get here via the tearDown() method. In the run
- # time code, it gets here via cleanup() and only appears in service.py
- # just before doing a sys.exit(), so cleanup() only happens once and
- # the leakage is not a problem.
- self.connection_cls.pool = None
-
-
-_pool_create_sem = semaphore.Semaphore()
-
-
-def get_connection_pool(conf, connection_cls):
- with _pool_create_sem:
- # Make sure only one thread tries to create the connection pool.
- if not connection_cls.pool:
- connection_cls.pool = Pool(conf, connection_cls)
- return connection_cls.pool
-
-
-class ConnectionContext(rpc_common.Connection):
- """The class that is actually returned to the create_connection() caller.
-
- This is essentially a wrapper around Connection that supports 'with'.
- It can also return a new Connection, or one from a pool.
-
- The function will also catch when an instance of this class is to be
- deleted. With that we can return Connections to the pool on exceptions
- and so forth without making the caller be responsible for catching them.
- If possible the function makes sure to return a connection to the pool.
- """
-
- def __init__(self, conf, connection_pool, pooled=True, server_params=None):
- """Create a new connection, or get one from the pool."""
- self.connection = None
- self.conf = conf
- self.connection_pool = connection_pool
- if pooled:
- self.connection = connection_pool.get()
- else:
- self.connection = connection_pool.connection_cls(
- conf,
- server_params=server_params)
- self.pooled = pooled
-
- def __enter__(self):
- """When with ConnectionContext() is used, return self."""
- return self
-
- def _done(self):
- """If the connection came from a pool, clean it up and put it back.
- If it did not come from a pool, close it.
- """
- if self.connection:
- if self.pooled:
- # Reset the connection so it's ready for the next caller
- # to grab from the pool
- self.connection.reset()
- self.connection_pool.put(self.connection)
- else:
- try:
- self.connection.close()
- except Exception:
- pass
- self.connection = None
-
- def __exit__(self, exc_type, exc_value, tb):
- """End of 'with' statement. We're done here."""
- self._done()
-
- def __del__(self):
- """Caller is done with this connection. Make sure we cleaned up."""
- self._done()
-
- def close(self):
- """Caller is done with this connection."""
- self._done()
-
- def create_consumer(self, topic, proxy, fanout=False):
- self.connection.create_consumer(topic, proxy, fanout)
-
- def create_worker(self, topic, proxy, pool_name):
- self.connection.create_worker(topic, proxy, pool_name)
-
- def join_consumer_pool(self, callback, pool_name, topic, exchange_name,
- ack_on_error=True):
- self.connection.join_consumer_pool(callback,
- pool_name,
- topic,
- exchange_name,
- ack_on_error)
-
- def consume_in_thread(self):
- return self.connection.consume_in_thread()
-
- def __getattr__(self, key):
- """Proxy all other calls to the Connection instance."""
- if self.connection:
- return getattr(self.connection, key)
- else:
- raise rpc_common.InvalidRPCConnectionReuse()
-
-
-class ReplyProxy(ConnectionContext):
- """Connection class for RPC replies / callbacks."""
- def __init__(self, conf, connection_pool):
- self._call_waiters = {}
- self._num_call_waiters = 0
- self._num_call_waiters_wrn_threshold = 10
- self._reply_q = 'reply_' + uuid.uuid4().hex
- super(ReplyProxy, self).__init__(conf, connection_pool, pooled=False)
- self.declare_direct_consumer(self._reply_q, self._process_data)
- self.consume_in_thread()
-
- def _process_data(self, message_data):
- msg_id = message_data.pop('_msg_id', None)
- waiter = self._call_waiters.get(msg_id)
- if not waiter:
- LOG.warn(_('No calling threads waiting for msg_id : %(msg_id)s'
- ', message : %(data)s'), {'msg_id': msg_id,
- 'data': message_data})
- LOG.warn(_('_call_waiters: %s') % str(self._call_waiters))
- else:
- waiter.put(message_data)
-
- def add_call_waiter(self, waiter, msg_id):
- self._num_call_waiters += 1
- if self._num_call_waiters > self._num_call_waiters_wrn_threshold:
- LOG.warn(_('Number of call waiters is greater than warning '
- 'threshold: %d. There could be a MulticallProxyWaiter '
- 'leak.') % self._num_call_waiters_wrn_threshold)
- self._num_call_waiters_wrn_threshold *= 2
- self._call_waiters[msg_id] = waiter
-
- def del_call_waiter(self, msg_id):
- self._num_call_waiters -= 1
- del self._call_waiters[msg_id]
-
- def get_reply_q(self):
- return self._reply_q
-
-
-def msg_reply(conf, msg_id, reply_q, connection_pool, reply=None,
- failure=None, ending=False, log_failure=True):
- """Sends a reply or an error on the channel signified by msg_id.
-
- Failure should be a sys.exc_info() tuple.
-
- """
- with ConnectionContext(conf, connection_pool) as conn:
- if failure:
- failure = rpc_common.serialize_remote_exception(failure,
- log_failure)
-
- msg = {'result': reply, 'failure': failure}
- if ending:
- msg['ending'] = True
- _add_unique_id(msg)
- # If a reply_q exists, add the msg_id to the reply and pass the
- # reply_q to direct_send() to use it as the response queue.
- # Otherwise use the msg_id for backward compatibility.
- if reply_q:
- msg['_msg_id'] = msg_id
- conn.direct_send(reply_q, rpc_common.serialize_msg(msg))
- else:
- conn.direct_send(msg_id, rpc_common.serialize_msg(msg))
-
-
-class RpcContext(rpc_common.CommonRpcContext):
- """Context that supports replying to a rpc.call."""
- def __init__(self, **kwargs):
- self.msg_id = kwargs.pop('msg_id', None)
- self.reply_q = kwargs.pop('reply_q', None)
- self.conf = kwargs.pop('conf')
- super(RpcContext, self).__init__(**kwargs)
-
- def deepcopy(self):
- values = self.to_dict()
- values['conf'] = self.conf
- values['msg_id'] = self.msg_id
- values['reply_q'] = self.reply_q
- return self.__class__(**values)
-
- def reply(self, reply=None, failure=None, ending=False,
- connection_pool=None, log_failure=True):
- if self.msg_id:
- msg_reply(self.conf, self.msg_id, self.reply_q, connection_pool,
- reply, failure, ending, log_failure)
- if ending:
- self.msg_id = None
-
-
-def unpack_context(conf, msg):
- """Unpack context from msg."""
- context_dict = {}
- for key in list(msg.keys()):
- # NOTE(vish): Some versions of python don't like unicode keys
- # in kwargs.
- key = str(key)
- if key.startswith('_context_'):
- value = msg.pop(key)
- context_dict[key[9:]] = value
- context_dict['msg_id'] = msg.pop('_msg_id', None)
- context_dict['reply_q'] = msg.pop('_reply_q', None)
- context_dict['conf'] = conf
- ctx = RpcContext.from_dict(context_dict)
- rpc_common._safe_log(LOG.debug, 'unpacked context: %s', ctx.to_dict())
- return ctx
-
-
-def pack_context(msg, context):
- """Pack context into msg.
-
- Values for message keys need to be less than 255 chars, so we pull
- context out into a bunch of separate keys. If we want to support
- more arguments in rabbit messages, we may want to do the same
- for args at some point.
-
- """
- if isinstance(context, dict):
- context_d = dict([('_context_%s' % key, value)
- for (key, value) in six.iteritems(context)])
- else:
- context_d = dict([('_context_%s' % key, value)
- for (key, value) in
- six.iteritems(context.to_dict())])
-
- msg.update(context_d)
-
-
-class _MsgIdCache(object):
- """This class checks any duplicate messages."""
-
- # NOTE: This value is considered can be a configuration item, but
- # it is not necessary to change its value in most cases,
- # so let this value as static for now.
- DUP_MSG_CHECK_SIZE = 16
-
- def __init__(self, **kwargs):
- self.prev_msgids = collections.deque([],
- maxlen=self.DUP_MSG_CHECK_SIZE)
-
- def check_duplicate_message(self, message_data):
- """AMQP consumers may read same message twice when exceptions occur
- before ack is returned. This method prevents doing it.
- """
- if UNIQUE_ID in message_data:
- msg_id = message_data[UNIQUE_ID]
- if msg_id not in self.prev_msgids:
- self.prev_msgids.append(msg_id)
- else:
- raise rpc_common.DuplicateMessageError(msg_id=msg_id)
-
-
-def _add_unique_id(msg):
- """Add unique_id for checking duplicate messages."""
- unique_id = uuid.uuid4().hex
- msg.update({UNIQUE_ID: unique_id})
- LOG.debug('UNIQUE_ID is %s.' % (unique_id))
-
-
-class _ThreadPoolWithWait(object):
- """Base class for a delayed invocation manager.
-
- Used by the Connection class to start up green threads
- to handle incoming messages.
- """
-
- def __init__(self, conf, connection_pool):
- self.pool = greenpool.GreenPool(conf.rpc_thread_pool_size)
- self.connection_pool = connection_pool
- self.conf = conf
-
- def wait(self):
- """Wait for all callback threads to exit."""
- self.pool.waitall()
-
-
-class CallbackWrapper(_ThreadPoolWithWait):
- """Wraps a straight callback.
-
- Allows it to be invoked in a green thread.
- """
-
- def __init__(self, conf, callback, connection_pool,
- wait_for_consumers=False):
- """Initiates CallbackWrapper object.
-
- :param conf: cfg.CONF instance
- :param callback: a callable (probably a function)
- :param connection_pool: connection pool as returned by
- get_connection_pool()
- :param wait_for_consumers: wait for all green threads to
- complete and raise the last
- caught exception, if any.
-
- """
- super(CallbackWrapper, self).__init__(
- conf=conf,
- connection_pool=connection_pool,
- )
- self.callback = callback
- self.wait_for_consumers = wait_for_consumers
- self.exc_info = None
-
- def _wrap(self, message_data, **kwargs):
- """Wrap the callback invocation to catch exceptions.
- """
- try:
- self.callback(message_data, **kwargs)
- except Exception:
- self.exc_info = sys.exc_info()
-
- def __call__(self, message_data):
- self.exc_info = None
- self.pool.spawn_n(self._wrap, message_data)
-
- if self.wait_for_consumers:
- self.pool.waitall()
- if self.exc_info:
- six.reraise(self.exc_info[1], None, self.exc_info[2])
-
-
-class ProxyCallback(_ThreadPoolWithWait):
- """Calls methods on a proxy object based on method and args."""
-
- def __init__(self, conf, proxy, connection_pool):
- super(ProxyCallback, self).__init__(
- conf=conf,
- connection_pool=connection_pool,
- )
- self.proxy = proxy
- self.msg_id_cache = _MsgIdCache()
-
- def __call__(self, message_data):
- """Consumer callback to call a method on a proxy object.
-
- Parses the message for validity and fires off a thread to call the
- proxy object method.
-
- Message data should be a dictionary with two keys:
- method: string representing the method to call
- args: dictionary of arg: value
-
- Example: {'method': 'echo', 'args': {'value': 42}}
-
- """
- # It is important to clear the context here, because at this point
- # the previous context is stored in local.store.context
- if hasattr(local.store, 'context'):
- del local.store.context
- rpc_common._safe_log(LOG.debug, 'received %s', message_data)
- self.msg_id_cache.check_duplicate_message(message_data)
- ctxt = unpack_context(self.conf, message_data)
- method = message_data.get('method')
- args = message_data.get('args', {})
- version = message_data.get('version')
- namespace = message_data.get('namespace')
- if not method:
- LOG.warn(_('no method for message: %s') % message_data)
- ctxt.reply(_('No method for message: %s') % message_data,
- connection_pool=self.connection_pool)
- return
- self.pool.spawn_n(self._process_data, ctxt, version, method,
- namespace, args)
-
- def _process_data(self, ctxt, version, method, namespace, args):
- """Process a message in a new thread.
-
- If the proxy object we have has a dispatch method
- (see rpc.dispatcher.RpcDispatcher), pass it the version,
- method, and args and let it dispatch as appropriate. If not, use
- the old behavior of magically calling the specified method on the
- proxy we have here.
- """
- ctxt.update_store()
- try:
- rval = self.proxy.dispatch(ctxt, version, method, namespace,
- **args)
- # Check if the result was a generator
- if inspect.isgenerator(rval):
- for x in rval:
- ctxt.reply(x, None, connection_pool=self.connection_pool)
- else:
- ctxt.reply(rval, None, connection_pool=self.connection_pool)
- # This final None tells multicall that it is done.
- ctxt.reply(ending=True, connection_pool=self.connection_pool)
- except rpc_common.ClientException as e:
- LOG.debug('Expected exception during message handling (%s)' %
- e._exc_info[1])
- ctxt.reply(None, e._exc_info,
- connection_pool=self.connection_pool,
- log_failure=False)
- except Exception:
- # sys.exc_info() is deleted by LOG.exception().
- exc_info = sys.exc_info()
- LOG.error(_LE('Exception during message handling'),
- exc_info=exc_info)
- ctxt.reply(None, exc_info, connection_pool=self.connection_pool)
-
-
-class MulticallProxyWaiter(object):
- def __init__(self, conf, msg_id, timeout, connection_pool):
- self._msg_id = msg_id
- self._timeout = timeout or conf.rpc_response_timeout
- self._reply_proxy = connection_pool.reply_proxy
- self._done = False
- self._got_ending = False
- self._conf = conf
- self._dataqueue = queue.LightQueue()
- # Add this caller to the reply proxy's call_waiters
- self._reply_proxy.add_call_waiter(self, self._msg_id)
- self.msg_id_cache = _MsgIdCache()
-
- def put(self, data):
- self._dataqueue.put(data)
-
- def done(self):
- if self._done:
- return
- self._done = True
- # Remove this caller from reply proxy's call_waiters
- self._reply_proxy.del_call_waiter(self._msg_id)
-
- def _process_data(self, data):
- result = None
- self.msg_id_cache.check_duplicate_message(data)
- if data['failure']:
- failure = data['failure']
- result = rpc_common.deserialize_remote_exception(self._conf,
- failure)
- elif data.get('ending', False):
- self._got_ending = True
- else:
- result = data['result']
- return result
-
- def __iter__(self):
- """Return a result until we get a reply with an 'ending' flag."""
- if self._done:
- raise StopIteration
- while True:
- try:
- data = self._dataqueue.get(timeout=self._timeout)
- result = self._process_data(data)
- except queue.Empty:
- self.done()
- raise rpc_common.Timeout()
- except Exception:
- with excutils.save_and_reraise_exception():
- self.done()
- if self._got_ending:
- self.done()
- raise StopIteration
- if isinstance(result, Exception):
- self.done()
- raise result
- yield result
-
-
-def create_connection(conf, new, connection_pool):
- """Create a connection."""
- return ConnectionContext(conf, connection_pool, pooled=not new)
-
-
-_reply_proxy_create_sem = semaphore.Semaphore()
-
-
-def multicall(conf, context, topic, msg, timeout, connection_pool):
- """Make a call that returns multiple times."""
- LOG.debug('Making synchronous call on %s ...', topic)
- msg_id = uuid.uuid4().hex
- msg.update({'_msg_id': msg_id})
- LOG.debug('MSG_ID is %s' % (msg_id))
- _add_unique_id(msg)
- pack_context(msg, context)
-
- with _reply_proxy_create_sem:
- if not connection_pool.reply_proxy:
- connection_pool.reply_proxy = ReplyProxy(conf, connection_pool)
- msg.update({'_reply_q': connection_pool.reply_proxy.get_reply_q()})
- wait_msg = MulticallProxyWaiter(conf, msg_id, timeout, connection_pool)
- with ConnectionContext(conf, connection_pool) as conn:
- conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout)
- return wait_msg
-
-
-def call(conf, context, topic, msg, timeout, connection_pool):
- """Sends a message on a topic and wait for a response."""
- rv = multicall(conf, context, topic, msg, timeout, connection_pool)
- # NOTE(vish): return the last result from the multicall
- rv = list(rv)
- if not rv:
- return
- return rv[-1]
-
-
-def cast(conf, context, topic, msg, connection_pool):
- """Sends a message on a topic without waiting for a response."""
- LOG.debug('Making asynchronous cast on %s...', topic)
- _add_unique_id(msg)
- pack_context(msg, context)
- with ConnectionContext(conf, connection_pool) as conn:
- conn.topic_send(topic, rpc_common.serialize_msg(msg))
-
-
-def fanout_cast(conf, context, topic, msg, connection_pool):
- """Sends a message on a fanout exchange without waiting for a response."""
- LOG.debug('Making asynchronous fanout cast...')
- _add_unique_id(msg)
- pack_context(msg, context)
- with ConnectionContext(conf, connection_pool) as conn:
- conn.fanout_send(topic, rpc_common.serialize_msg(msg))
-
-
-def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
- """Sends a message on a topic to a specific server."""
- _add_unique_id(msg)
- pack_context(msg, context)
- with ConnectionContext(conf, connection_pool, pooled=False,
- server_params=server_params) as conn:
- conn.topic_send(topic, rpc_common.serialize_msg(msg))
-
-
-def fanout_cast_to_server(conf, context, server_params, topic, msg,
- connection_pool):
- """Sends a message on a fanout exchange to a specific server."""
- _add_unique_id(msg)
- pack_context(msg, context)
- with ConnectionContext(conf, connection_pool, pooled=False,
- server_params=server_params) as conn:
- conn.fanout_send(topic, rpc_common.serialize_msg(msg))
-
-
-def notify(conf, context, topic, msg, connection_pool, envelope):
- """Sends a notification event on a topic."""
- LOG.debug('Sending %(event_type)s on %(topic)s',
- dict(event_type=msg.get('event_type'),
- topic=topic))
- _add_unique_id(msg)
- pack_context(msg, context)
- with ConnectionContext(conf, connection_pool) as conn:
- if envelope:
- msg = rpc_common.serialize_msg(msg)
- conn.notify_send(topic, msg)
-
-
-def cleanup(connection_pool):
- if connection_pool:
- connection_pool.empty()
-
-
-def get_control_exchange(conf):
- return conf.control_exchange
diff --git a/ceilometer/openstack/common/rpc/common.py b/ceilometer/openstack/common/rpc/common.py
deleted file mode 100644
index ba801eda..00000000
--- a/ceilometer/openstack/common/rpc/common.py
+++ /dev/null
@@ -1,508 +0,0 @@
-# Copyright 2010 United States Government as represented by the
-# Administrator of the National Aeronautics and Space Administration.
-# All Rights Reserved.
-# Copyright 2011 Red Hat, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import copy
-import sys
-import traceback
-
-from oslo.config import cfg
-import six
-
-from ceilometer.openstack.common.gettextutils import _, _LE
-from ceilometer.openstack.common import importutils
-from ceilometer.openstack.common import jsonutils
-from ceilometer.openstack.common import local
-from ceilometer.openstack.common import log as logging
-from ceilometer.openstack.common import versionutils
-
-
-CONF = cfg.CONF
-LOG = logging.getLogger(__name__)
-
-
-_RPC_ENVELOPE_VERSION = '2.0'
-'''RPC Envelope Version.
-
-This version number applies to the top level structure of messages sent out.
-It does *not* apply to the message payload, which must be versioned
-independently. For example, when using rpc APIs, a version number is applied
-for changes to the API being exposed over rpc. This version number is handled
-in the rpc proxy and dispatcher modules.
-
-This version number applies to the message envelope that is used in the
-serialization done inside the rpc layer. See serialize_msg() and
-deserialize_msg().
-
-The current message format (version 2.0) is very simple. It is::
-
- {
- 'oslo.version': <RPC Envelope Version as a String>,
- 'oslo.message': <Application Message Payload, JSON encoded>
- }
-
-Message format version '1.0' is just considered to be the messages we sent
-without a message envelope.
-
-So, the current message envelope just includes the envelope version. It may
-eventually contain additional information, such as a signature for the message
-payload.
-
-We will JSON encode the application message payload. The message envelope,
-which includes the JSON encoded application message body, will be passed down
-to the messaging libraries as a dict.
-'''
-
-_VERSION_KEY = 'oslo.version'
-_MESSAGE_KEY = 'oslo.message'
-
-_REMOTE_POSTFIX = '_Remote'
-
-
-class RPCException(Exception):
- msg_fmt = _("An unknown RPC related exception occurred.")
-
- def __init__(self, message=None, **kwargs):
- self.kwargs = kwargs
-
- if not message:
- try:
- message = self.msg_fmt % kwargs
-
- except Exception:
- # kwargs doesn't match a variable in the message
- # log the issue and the kwargs
- LOG.exception(_LE('Exception in string format operation'))
- for name, value in six.iteritems(kwargs):
- LOG.error("%s: %s" % (name, value))
- # at least get the core message out if something happened
- message = self.msg_fmt
-
- super(RPCException, self).__init__(message)
-
-
-class RemoteError(RPCException):
- """Signifies that a remote class has raised an exception.
-
- Contains a string representation of the type of the original exception,
- the value of the original exception, and the traceback. These are
- sent to the parent as a joined string so printing the exception
- contains all of the relevant info.
-
- """
- msg_fmt = _("Remote error: %(exc_type)s %(value)s\n%(traceback)s.")
-
- def __init__(self, exc_type=None, value=None, traceback=None):
- self.exc_type = exc_type
- self.value = value
- self.traceback = traceback
- super(RemoteError, self).__init__(exc_type=exc_type,
- value=value,
- traceback=traceback)
-
-
-class Timeout(RPCException):
- """Signifies that a timeout has occurred.
-
- This exception is raised if the rpc_response_timeout is reached while
- waiting for a response from the remote side.
- """
- msg_fmt = _('Timeout while waiting on RPC response - '
- 'topic: "%(topic)s", RPC method: "%(method)s" '
- 'info: "%(info)s"')
-
- def __init__(self, info=None, topic=None, method=None):
- """Initiates Timeout object.
-
- :param info: Extra info to convey to the user
- :param topic: The topic that the rpc call was sent to
- :param rpc_method_name: The name of the rpc method being
- called
- """
- self.info = info
- self.topic = topic
- self.method = method
- super(Timeout, self).__init__(
- None,
- info=info or _('<unknown>'),
- topic=topic or _('<unknown>'),
- method=method or _('<unknown>'))
-
-
-class DuplicateMessageError(RPCException):
- msg_fmt = _("Found duplicate message(%(msg_id)s). Skipping it.")
-
-
-class InvalidRPCConnectionReuse(RPCException):
- msg_fmt = _("Invalid reuse of an RPC connection.")
-
-
-class UnsupportedRpcVersion(RPCException):
- msg_fmt = _("Specified RPC version, %(version)s, not supported by "
- "this endpoint.")
-
-
-class UnsupportedRpcEnvelopeVersion(RPCException):
- msg_fmt = _("Specified RPC envelope version, %(version)s, "
- "not supported by this endpoint.")
-
-
-class RpcVersionCapError(RPCException):
- msg_fmt = _("Specified RPC version cap, %(version_cap)s, is too low")
-
-
-class Connection(object):
- """A connection, returned by rpc.create_connection().
-
- This class represents a connection to the message bus used for rpc.
- An instance of this class should never be created by users of the rpc API.
- Use rpc.create_connection() instead.
- """
- def close(self):
- """Close the connection.
-
- This method must be called when the connection will no longer be used.
- It will ensure that any resources associated with the connection, such
- as a network connection, and cleaned up.
- """
- raise NotImplementedError()
-
- def create_consumer(self, topic, proxy, fanout=False):
- """Create a consumer on this connection.
-
- A consumer is associated with a message queue on the backend message
- bus. The consumer will read messages from the queue, unpack them, and
- dispatch them to the proxy object. The contents of the message pulled
- off of the queue will determine which method gets called on the proxy
- object.
-
- :param topic: This is a name associated with what to consume from.
- Multiple instances of a service may consume from the same
- topic. For example, all instances of nova-compute consume
- from a queue called "compute". In that case, the
- messages will get distributed amongst the consumers in a
- round-robin fashion if fanout=False. If fanout=True,
- every consumer associated with this topic will get a
- copy of every message.
- :param proxy: The object that will handle all incoming messages.
- :param fanout: Whether or not this is a fanout topic. See the
- documentation for the topic parameter for some
- additional comments on this.
- """
- raise NotImplementedError()
-
- def create_worker(self, topic, proxy, pool_name):
- """Create a worker on this connection.
-
- A worker is like a regular consumer of messages directed to a
- topic, except that it is part of a set of such consumers (the
- "pool") which may run in parallel. Every pool of workers will
- receive a given message, but only one worker in the pool will
- be asked to process it. Load is distributed across the members
- of the pool in round-robin fashion.
-
- :param topic: This is a name associated with what to consume from.
- Multiple instances of a service may consume from the same
- topic.
- :param proxy: The object that will handle all incoming messages.
- :param pool_name: String containing the name of the pool of workers
- """
- raise NotImplementedError()
-
- def join_consumer_pool(self, callback, pool_name, topic, exchange_name):
- """Register as a member of a group of consumers.
-
- Uses given topic from the specified exchange.
- Exactly one member of a given pool will receive each message.
-
- A message will be delivered to multiple pools, if more than
- one is created.
-
- :param callback: Callable to be invoked for each message.
- :type callback: callable accepting one argument
- :param pool_name: The name of the consumer pool.
- :type pool_name: str
- :param topic: The routing topic for desired messages.
- :type topic: str
- :param exchange_name: The name of the message exchange where
- the client should attach. Defaults to
- the configured exchange.
- :type exchange_name: str
- """
- raise NotImplementedError()
-
- def consume_in_thread(self):
- """Spawn a thread to handle incoming messages.
-
- Spawn a thread that will be responsible for handling all incoming
- messages for consumers that were set up on this connection.
-
- Message dispatching inside of this is expected to be implemented in a
- non-blocking manner. An example implementation would be having this
- thread pull messages in for all of the consumers, but utilize a thread
- pool for dispatching the messages to the proxy objects.
- """
- raise NotImplementedError()
-
-
-def _safe_log(log_func, msg, msg_data):
- """Sanitizes the msg_data field before logging."""
- SANITIZE = ['_context_auth_token', 'auth_token', 'new_pass']
-
- def _fix_passwords(d):
- """Sanitizes the password fields in the dictionary."""
- for k in six.iterkeys(d):
- if k.lower().find('password') != -1:
- d[k] = '<SANITIZED>'
- elif k.lower() in SANITIZE:
- d[k] = '<SANITIZED>'
- elif isinstance(d[k], list):
- for e in d[k]:
- if isinstance(e, dict):
- _fix_passwords(e)
- elif isinstance(d[k], dict):
- _fix_passwords(d[k])
- return d
-
- return log_func(msg, _fix_passwords(copy.deepcopy(msg_data)))
-
-
-def serialize_remote_exception(failure_info, log_failure=True):
- """Prepares exception data to be sent over rpc.
-
- Failure_info should be a sys.exc_info() tuple.
-
- """
- tb = traceback.format_exception(*failure_info)
- failure = failure_info[1]
- if log_failure:
- LOG.error(_LE("Returning exception %s to caller"),
- six.text_type(failure))
- LOG.error(tb)
-
- kwargs = {}
- if hasattr(failure, 'kwargs'):
- kwargs = failure.kwargs
-
- # NOTE(matiu): With cells, it's possible to re-raise remote, remote
- # exceptions. Lets turn it back into the original exception type.
- cls_name = str(failure.__class__.__name__)
- mod_name = str(failure.__class__.__module__)
- if (cls_name.endswith(_REMOTE_POSTFIX) and
- mod_name.endswith(_REMOTE_POSTFIX)):
- cls_name = cls_name[:-len(_REMOTE_POSTFIX)]
- mod_name = mod_name[:-len(_REMOTE_POSTFIX)]
-
- data = {
- 'class': cls_name,
- 'module': mod_name,
- 'message': six.text_type(failure),
- 'tb': tb,
- 'args': failure.args,
- 'kwargs': kwargs
- }
-
- json_data = jsonutils.dumps(data)
-
- return json_data
-
-
-def deserialize_remote_exception(conf, data):
- failure = jsonutils.loads(str(data))
-
- trace = failure.get('tb', [])
- message = failure.get('message', "") + "\n" + "\n".join(trace)
- name = failure.get('class')
- module = failure.get('module')
-
- # NOTE(ameade): We DO NOT want to allow just any module to be imported, in
- # order to prevent arbitrary code execution.
- if module not in conf.allowed_rpc_exception_modules:
- return RemoteError(name, failure.get('message'), trace)
-
- try:
- mod = importutils.import_module(module)
- klass = getattr(mod, name)
- if not issubclass(klass, Exception):
- raise TypeError("Can only deserialize Exceptions")
-
- failure = klass(*failure.get('args', []), **failure.get('kwargs', {}))
- except (AttributeError, TypeError, ImportError):
- return RemoteError(name, failure.get('message'), trace)
-
- ex_type = type(failure)
- str_override = lambda self: message
- new_ex_type = type(ex_type.__name__ + _REMOTE_POSTFIX, (ex_type,),
- {'__str__': str_override, '__unicode__': str_override})
- new_ex_type.__module__ = '%s%s' % (module, _REMOTE_POSTFIX)
- try:
- # NOTE(ameade): Dynamically create a new exception type and swap it in
- # as the new type for the exception. This only works on user defined
- # Exceptions and not core python exceptions. This is important because
- # we cannot necessarily change an exception message so we must override
- # the __str__ method.
- failure.__class__ = new_ex_type
- except TypeError:
- # NOTE(ameade): If a core exception then just add the traceback to the
- # first exception argument.
- failure.args = (message,) + failure.args[1:]
- return failure
-
-
-class CommonRpcContext(object):
- def __init__(self, **kwargs):
- self.values = kwargs
-
- def __getattr__(self, key):
- try:
- return self.values[key]
- except KeyError:
- raise AttributeError(key)
-
- def to_dict(self):
- return copy.deepcopy(self.values)
-
- @classmethod
- def from_dict(cls, values):
- return cls(**values)
-
- def deepcopy(self):
- return self.from_dict(self.to_dict())
-
- def update_store(self):
- local.store.context = self
-
- def elevated(self, read_deleted=None, overwrite=False):
- """Return a version of this context with admin flag set."""
- # TODO(russellb) This method is a bit of a nova-ism. It makes
- # some assumptions about the data in the request context sent
- # across rpc, while the rest of this class does not. We could get
- # rid of this if we changed the nova code that uses this to
- # convert the RpcContext back to its native RequestContext doing
- # something like nova.context.RequestContext.from_dict(ctxt.to_dict())
-
- context = self.deepcopy()
- context.values['is_admin'] = True
-
- context.values.setdefault('roles', [])
-
- if 'admin' not in context.values['roles']:
- context.values['roles'].append('admin')
-
- if read_deleted is not None:
- context.values['read_deleted'] = read_deleted
-
- return context
-
-
-class ClientException(Exception):
- """Encapsulates actual exception expected to be hit by a RPC proxy object.
-
- Merely instantiating it records the current exception information, which
- will be passed back to the RPC client without exceptional logging.
- """
- def __init__(self):
- self._exc_info = sys.exc_info()
-
-
-def catch_client_exception(exceptions, func, *args, **kwargs):
- try:
- return func(*args, **kwargs)
- except Exception as e:
- if type(e) in exceptions:
- raise ClientException()
- else:
- raise
-
-
-def client_exceptions(*exceptions):
- """Decorator for manager methods that raise expected exceptions.
-
- Marking a Manager method with this decorator allows the declaration
- of expected exceptions that the RPC layer should not consider fatal,
- and not log as if they were generated in a real error scenario. Note
- that this will cause listed exceptions to be wrapped in a
- ClientException, which is used internally by the RPC layer.
- """
- def outer(func):
- def inner(*args, **kwargs):
- return catch_client_exception(exceptions, func, *args, **kwargs)
- return inner
- return outer
-
-
-# TODO(sirp): we should deprecate this in favor of
-# using `versionutils.is_compatible` directly
-def version_is_compatible(imp_version, version):
- """Determine whether versions are compatible.
-
- :param imp_version: The version implemented
- :param version: The version requested by an incoming message.
- """
- return versionutils.is_compatible(version, imp_version)
-
-
-def serialize_msg(raw_msg):
- # NOTE(russellb) See the docstring for _RPC_ENVELOPE_VERSION for more
- # information about this format.
- msg = {_VERSION_KEY: _RPC_ENVELOPE_VERSION,
- _MESSAGE_KEY: jsonutils.dumps(raw_msg)}
-
- return msg
-
-
-def deserialize_msg(msg):
- # NOTE(russellb): Hang on to your hats, this road is about to
- # get a little bumpy.
- #
- # Robustness Principle:
- # "Be strict in what you send, liberal in what you accept."
- #
- # At this point we have to do a bit of guessing about what it
- # is we just received. Here is the set of possibilities:
- #
- # 1) We received a dict. This could be 2 things:
- #
- # a) Inspect it to see if it looks like a standard message envelope.
- # If so, great!
- #
- # b) If it doesn't look like a standard message envelope, it could either
- # be a notification, or a message from before we added a message
- # envelope (referred to as version 1.0).
- # Just return the message as-is.
- #
- # 2) It's any other non-dict type. Just return it and hope for the best.
- # This case covers return values from rpc.call() from before message
- # envelopes were used. (messages to call a method were always a dict)
-
- if not isinstance(msg, dict):
- # See #2 above.
- return msg
-
- base_envelope_keys = (_VERSION_KEY, _MESSAGE_KEY)
- if not all(map(lambda key: key in msg, base_envelope_keys)):
- # See #1.b above.
- return msg
-
- # At this point we think we have the message envelope
- # format we were expecting. (#1.a above)
-
- if not version_is_compatible(_RPC_ENVELOPE_VERSION, msg[_VERSION_KEY]):
- raise UnsupportedRpcEnvelopeVersion(version=msg[_VERSION_KEY])
-
- raw_msg = jsonutils.loads(msg[_MESSAGE_KEY])
-
- return raw_msg
diff --git a/ceilometer/openstack/common/rpc/dispatcher.py b/ceilometer/openstack/common/rpc/dispatcher.py
deleted file mode 100644
index 455cd4b9..00000000
--- a/ceilometer/openstack/common/rpc/dispatcher.py
+++ /dev/null
@@ -1,178 +0,0 @@
-# Copyright 2012 Red Hat, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-"""
-Code for rpc message dispatching.
-
-Messages that come in have a version number associated with them. RPC API
-version numbers are in the form:
-
- Major.Minor
-
-For a given message with version X.Y, the receiver must be marked as able to
-handle messages of version A.B, where:
-
- A = X
-
- B >= Y
-
-The Major version number would be incremented for an almost completely new API.
-The Minor version number would be incremented for backwards compatible changes
-to an existing API. A backwards compatible change could be something like
-adding a new method, adding an argument to an existing method (but not
-requiring it), or changing the type for an existing argument (but still
-handling the old type as well).
-
-The conversion over to a versioned API must be done on both the client side and
-server side of the API at the same time. However, as the code stands today,
-there can be both versioned and unversioned APIs implemented in the same code
-base.
-
-EXAMPLES
-========
-
-Nova was the first project to use versioned rpc APIs. Consider the compute rpc
-API as an example. The client side is in nova/compute/rpcapi.py and the server
-side is in nova/compute/manager.py.
-
-
-Example 1) Adding a new method.
--------------------------------
-
-Adding a new method is a backwards compatible change. It should be added to
-nova/compute/manager.py, and RPC_API_VERSION should be bumped from X.Y to
-X.Y+1. On the client side, the new method in nova/compute/rpcapi.py should
-have a specific version specified to indicate the minimum API version that must
-be implemented for the method to be supported. For example::
-
- def get_host_uptime(self, ctxt, host):
- topic = _compute_topic(self.topic, ctxt, host, None)
- return self.call(ctxt, self.make_msg('get_host_uptime'), topic,
- version='1.1')
-
-In this case, version '1.1' is the first version that supported the
-get_host_uptime() method.
-
-
-Example 2) Adding a new parameter.
-----------------------------------
-
-Adding a new parameter to an rpc method can be made backwards compatible. The
-RPC_API_VERSION on the server side (nova/compute/manager.py) should be bumped.
-The implementation of the method must not expect the parameter to be present.::
-
- def some_remote_method(self, arg1, arg2, newarg=None):
- # The code needs to deal with newarg=None for cases
- # where an older client sends a message without it.
- pass
-
-On the client side, the same changes should be made as in example 1. The
-minimum version that supports the new parameter should be specified.
-"""
-
-import six
-
-from ceilometer.openstack.common.rpc import common as rpc_common
-from ceilometer.openstack.common.rpc import serializer as rpc_serializer
-
-
-class RpcDispatcher(object):
- """Dispatch rpc messages according to the requested API version.
-
- This class can be used as the top level 'manager' for a service. It
- contains a list of underlying managers that have an API_VERSION attribute.
- """
-
- def __init__(self, callbacks, serializer=None):
- """Initialize the rpc dispatcher.
-
- :param callbacks: List of proxy objects that are an instance
- of a class with rpc methods exposed. Each proxy
- object should have an RPC_API_VERSION attribute.
- :param serializer: The Serializer object that will be used to
- deserialize arguments before the method call and
- to serialize the result after it returns.
- """
- self.callbacks = callbacks
- if serializer is None:
- serializer = rpc_serializer.NoOpSerializer()
- self.serializer = serializer
- super(RpcDispatcher, self).__init__()
-
- def _deserialize_args(self, context, kwargs):
- """Helper method called to deserialize args before dispatch.
-
- This calls our serializer on each argument, returning a new set of
- args that have been deserialized.
-
- :param context: The request context
- :param kwargs: The arguments to be deserialized
- :returns: A new set of deserialized args
- """
- new_kwargs = dict()
- for argname, arg in six.iteritems(kwargs):
- new_kwargs[argname] = self.serializer.deserialize_entity(context,
- arg)
- return new_kwargs
-
- def dispatch(self, ctxt, version, method, namespace, **kwargs):
- """Dispatch a message based on a requested version.
-
- :param ctxt: The request context
- :param version: The requested API version from the incoming message
- :param method: The method requested to be called by the incoming
- message.
- :param namespace: The namespace for the requested method. If None,
- the dispatcher will look for a method on a callback
- object with no namespace set.
- :param kwargs: A dict of keyword arguments to be passed to the method.
-
- :returns: Whatever is returned by the underlying method that gets
- called.
- """
- if not version:
- version = '1.0'
-
- had_compatible = False
- for proxyobj in self.callbacks:
- # Check for namespace compatibility
- try:
- cb_namespace = proxyobj.RPC_API_NAMESPACE
- except AttributeError:
- cb_namespace = None
-
- if namespace != cb_namespace:
- continue
-
- # Check for version compatibility
- try:
- rpc_api_version = proxyobj.RPC_API_VERSION
- except AttributeError:
- rpc_api_version = '1.0'
-
- is_compatible = rpc_common.version_is_compatible(rpc_api_version,
- version)
- had_compatible = had_compatible or is_compatible
-
- if not hasattr(proxyobj, method):
- continue
- if is_compatible:
- kwargs = self._deserialize_args(ctxt, kwargs)
- result = getattr(proxyobj, method)(ctxt, **kwargs)
- return self.serializer.serialize_entity(ctxt, result)
-
- if had_compatible:
- raise AttributeError("No such RPC function '%s'" % method)
- else:
- raise rpc_common.UnsupportedRpcVersion(version=version)
diff --git a/ceilometer/openstack/common/rpc/impl_fake.py b/ceilometer/openstack/common/rpc/impl_fake.py
deleted file mode 100644
index e77db6c7..00000000
--- a/ceilometer/openstack/common/rpc/impl_fake.py
+++ /dev/null
@@ -1,195 +0,0 @@
-# Copyright 2011 OpenStack Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-"""Fake RPC implementation which calls proxy methods directly with no
-queues. Casts will block, but this is very useful for tests.
-"""
-
-import inspect
-# NOTE(russellb): We specifically want to use json, not our own jsonutils.
-# jsonutils has some extra logic to automatically convert objects to primitive
-# types so that they can be serialized. We want to catch all cases where
-# non-primitive types make it into this code and treat it as an error.
-import json
-import time
-
-import eventlet
-import six
-
-from ceilometer.openstack.common.rpc import common as rpc_common
-
-CONSUMERS = {}
-
-
-class RpcContext(rpc_common.CommonRpcContext):
- def __init__(self, **kwargs):
- super(RpcContext, self).__init__(**kwargs)
- self._response = []
- self._done = False
-
- def deepcopy(self):
- values = self.to_dict()
- new_inst = self.__class__(**values)
- new_inst._response = self._response
- new_inst._done = self._done
- return new_inst
-
- def reply(self, reply=None, failure=None, ending=False):
- if ending:
- self._done = True
- if not self._done:
- self._response.append((reply, failure))
-
-
-class Consumer(object):
- def __init__(self, topic, proxy):
- self.topic = topic
- self.proxy = proxy
-
- def call(self, context, version, method, namespace, args, timeout):
- done = eventlet.event.Event()
-
- def _inner():
- ctxt = RpcContext.from_dict(context.to_dict())
- try:
- rval = self.proxy.dispatch(context, version, method,
- namespace, **args)
- res = []
- # Caller might have called ctxt.reply() manually
- for (reply, failure) in ctxt._response:
- if failure:
- six.reraise(failure[0], failure[1], failure[2])
- res.append(reply)
- # if ending not 'sent'...we might have more data to
- # return from the function itself
- if not ctxt._done:
- if inspect.isgenerator(rval):
- for val in rval:
- res.append(val)
- else:
- res.append(rval)
- done.send(res)
- except rpc_common.ClientException as e:
- done.send_exception(e._exc_info[1])
- except Exception as e:
- done.send_exception(e)
-
- thread = eventlet.greenthread.spawn(_inner)
-
- if timeout:
- start_time = time.time()
- while not done.ready():
- eventlet.greenthread.sleep(1)
- cur_time = time.time()
- if (cur_time - start_time) > timeout:
- thread.kill()
- raise rpc_common.Timeout()
-
- return done.wait()
-
-
-class Connection(object):
- """Connection object."""
-
- def __init__(self):
- self.consumers = []
-
- def create_consumer(self, topic, proxy, fanout=False):
- consumer = Consumer(topic, proxy)
- self.consumers.append(consumer)
- if topic not in CONSUMERS:
- CONSUMERS[topic] = []
- CONSUMERS[topic].append(consumer)
-
- def close(self):
- for consumer in self.consumers:
- CONSUMERS[consumer.topic].remove(consumer)
- self.consumers = []
-
- def consume_in_thread(self):
- pass
-
-
-def create_connection(conf, new=True):
- """Create a connection."""
- return Connection()
-
-
-def check_serialize(msg):
- """Make sure a message intended for rpc can be serialized."""
- json.dumps(msg)
-
-
-def multicall(conf, context, topic, msg, timeout=None):
- """Make a call that returns multiple times."""
-
- check_serialize(msg)
-
- method = msg.get('method')
- if not method:
- return
- args = msg.get('args', {})
- version = msg.get('version')
- namespace = msg.get('namespace')
-
- try:
- consumer = CONSUMERS[topic][0]
- except (KeyError, IndexError):
- raise rpc_common.Timeout("No consumers available")
- else:
- return consumer.call(context, version, method, namespace, args,
- timeout)
-
-
-def call(conf, context, topic, msg, timeout=None):
- """Sends a message on a topic and wait for a response."""
- rv = multicall(conf, context, topic, msg, timeout)
- # NOTE(vish): return the last result from the multicall
- rv = list(rv)
- if not rv:
- return
- return rv[-1]
-
-
-def cast(conf, context, topic, msg):
- check_serialize(msg)
- try:
- call(conf, context, topic, msg)
- except Exception:
- pass
-
-
-def notify(conf, context, topic, msg, envelope):
- check_serialize(msg)
-
-
-def cleanup():
- pass
-
-
-def fanout_cast(conf, context, topic, msg):
- """Cast to all consumers of a topic."""
- check_serialize(msg)
- method = msg.get('method')
- if not method:
- return
- args = msg.get('args', {})
- version = msg.get('version')
- namespace = msg.get('namespace')
-
- for consumer in CONSUMERS.get(topic, []):
- try:
- consumer.call(context, version, method, namespace, args, None)
- except Exception:
- pass
diff --git a/ceilometer/openstack/common/rpc/impl_kombu.py b/ceilometer/openstack/common/rpc/impl_kombu.py
deleted file mode 100644
index 697e4b5c..00000000
--- a/ceilometer/openstack/common/rpc/impl_kombu.py
+++ /dev/null
@@ -1,858 +0,0 @@
-# Copyright 2011 OpenStack Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import functools
-import itertools
-import socket
-import ssl
-import time
-import uuid
-
-import eventlet
-import greenlet
-import kombu
-import kombu.connection
-import kombu.entity
-import kombu.messaging
-from oslo.config import cfg
-import six
-
-from ceilometer.openstack.common import excutils
-from ceilometer.openstack.common.gettextutils import _, _LE, _LI
-from ceilometer.openstack.common import network_utils
-from ceilometer.openstack.common.rpc import amqp as rpc_amqp
-from ceilometer.openstack.common.rpc import common as rpc_common
-from ceilometer.openstack.common import sslutils
-
-kombu_opts = [
- cfg.StrOpt('kombu_ssl_version',
- default='',
- help='If SSL is enabled, the SSL version to use. Valid '
- 'values are TLSv1, SSLv23 and SSLv3. SSLv2 might '
- 'be available on some distributions.'
- ),
- cfg.StrOpt('kombu_ssl_keyfile',
- default='',
- help='SSL key file (valid only if SSL enabled)'),
- cfg.StrOpt('kombu_ssl_certfile',
- default='',
- help='SSL cert file (valid only if SSL enabled)'),
- cfg.StrOpt('kombu_ssl_ca_certs',
- default='',
- help=('SSL certification authority file '
- '(valid only if SSL enabled)')),
- cfg.StrOpt('rabbit_host',
- default='localhost',
- help='The RabbitMQ broker address where a single node is used'),
- cfg.IntOpt('rabbit_port',
- default=5672,
- help='The RabbitMQ broker port where a single node is used'),
- cfg.ListOpt('rabbit_hosts',
- default=['$rabbit_host:$rabbit_port'],
- help='RabbitMQ HA cluster host:port pairs'),
- cfg.BoolOpt('rabbit_use_ssl',
- default=False,
- help='Connect over SSL for RabbitMQ'),
- cfg.StrOpt('rabbit_userid',
- default='guest',
- help='The RabbitMQ userid'),
- cfg.StrOpt('rabbit_password',
- default='guest',
- help='The RabbitMQ password',
- secret=True),
- cfg.StrOpt('rabbit_virtual_host',
- default='/',
- help='The RabbitMQ virtual host'),
- cfg.IntOpt('rabbit_retry_interval',
- default=1,
- help='How frequently to retry connecting with RabbitMQ'),
- cfg.IntOpt('rabbit_retry_backoff',
- default=2,
- help='How long to backoff for between retries when connecting '
- 'to RabbitMQ'),
- cfg.IntOpt('rabbit_max_retries',
- default=0,
- help='Maximum number of RabbitMQ connection retries. '
- 'Default is 0 (infinite retry count)'),
- cfg.BoolOpt('rabbit_ha_queues',
- default=False,
- help='Use HA queues in RabbitMQ (x-ha-policy: all). '
- 'If you change this option, you must wipe the '
- 'RabbitMQ database.'),
-
-]
-
-cfg.CONF.register_opts(kombu_opts)
-
-LOG = rpc_common.LOG
-
-
-def _get_queue_arguments(conf):
- """Construct the arguments for declaring a queue.
-
- If the rabbit_ha_queues option is set, we declare a mirrored queue
- as described here:
-
- http://www.rabbitmq.com/ha.html
-
- Setting x-ha-policy to all means that the queue will be mirrored
- to all nodes in the cluster.
- """
- return {'x-ha-policy': 'all'} if conf.rabbit_ha_queues else {}
-
-
-class ConsumerBase(object):
- """Consumer base class."""
-
- def __init__(self, channel, callback, tag, **kwargs):
- """Declare a queue on an amqp channel.
-
- 'channel' is the amqp channel to use
- 'callback' is the callback to call when messages are received
- 'tag' is a unique ID for the consumer on the channel
-
- queue name, exchange name, and other kombu options are
- passed in here as a dictionary.
- """
- self.callback = callback
- self.tag = str(tag)
- self.kwargs = kwargs
- self.queue = None
- self.ack_on_error = kwargs.get('ack_on_error', True)
- self.reconnect(channel)
-
- def reconnect(self, channel):
- """Re-declare the queue after a rabbit reconnect."""
- self.channel = channel
- self.kwargs['channel'] = channel
- self.queue = kombu.entity.Queue(**self.kwargs)
- self.queue.declare()
-
- def _callback_handler(self, message, callback):
- """Call callback with deserialized message.
-
- Messages that are processed without exception are ack'ed.
-
- If the message processing generates an exception, it will be
- ack'ed if ack_on_error=True. Otherwise it will be .requeue()'ed.
- """
-
- try:
- msg = rpc_common.deserialize_msg(message.payload)
- callback(msg)
- except Exception:
- if self.ack_on_error:
- LOG.exception(_LE("Failed to process message"
- " ... skipping it."))
- message.ack()
- else:
- LOG.exception(_LE("Failed to process message"
- " ... will requeue."))
- message.requeue()
- else:
- message.ack()
-
- def consume(self, *args, **kwargs):
- """Actually declare the consumer on the amqp channel. This will
- start the flow of messages from the queue. Using the
- Connection.iterconsume() iterator will process the messages,
- calling the appropriate callback.
-
- If a callback is specified in kwargs, use that. Otherwise,
- use the callback passed during __init__()
-
- If kwargs['nowait'] is True, then this call will block until
- a message is read.
-
- """
-
- options = {'consumer_tag': self.tag}
- options['nowait'] = kwargs.get('nowait', False)
- callback = kwargs.get('callback', self.callback)
- if not callback:
- raise ValueError("No callback defined")
-
- def _callback(raw_message):
- message = self.channel.message_to_python(raw_message)
- self._callback_handler(message, callback)
-
- self.queue.consume(*args, callback=_callback, **options)
-
- def cancel(self):
- """Cancel the consuming from the queue, if it has started."""
- try:
- self.queue.cancel(self.tag)
- except KeyError as e:
- # NOTE(comstud): Kludge to get around a amqplib bug
- if str(e) != "u'%s'" % self.tag:
- raise
- self.queue = None
-
-
-class DirectConsumer(ConsumerBase):
- """Queue/consumer class for 'direct'."""
-
- def __init__(self, conf, channel, msg_id, callback, tag, **kwargs):
- """Init a 'direct' queue.
-
- 'channel' is the amqp channel to use
- 'msg_id' is the msg_id to listen on
- 'callback' is the callback to call when messages are received
- 'tag' is a unique ID for the consumer on the channel
-
- Other kombu options may be passed
- """
- # Default options
- options = {'durable': False,
- 'queue_arguments': _get_queue_arguments(conf),
- 'auto_delete': True,
- 'exclusive': False}
- options.update(kwargs)
- exchange = kombu.entity.Exchange(name=msg_id,
- type='direct',
- durable=options['durable'],
- auto_delete=options['auto_delete'])
- super(DirectConsumer, self).__init__(channel,
- callback,
- tag,
- name=msg_id,
- exchange=exchange,
- routing_key=msg_id,
- **options)
-
-
-class TopicConsumer(ConsumerBase):
- """Consumer class for 'topic'."""
-
- def __init__(self, conf, channel, topic, callback, tag, name=None,
- exchange_name=None, **kwargs):
- """Init a 'topic' queue.
-
- :param channel: the amqp channel to use
- :param topic: the topic to listen on
- :paramtype topic: str
- :param callback: the callback to call when messages are received
- :param tag: a unique ID for the consumer on the channel
- :param name: optional queue name, defaults to topic
- :paramtype name: str
-
- Other kombu options may be passed as keyword arguments
- """
- # Default options
- options = {'durable': conf.amqp_durable_queues,
- 'queue_arguments': _get_queue_arguments(conf),
- 'auto_delete': conf.amqp_auto_delete,
- 'exclusive': False}
- options.update(kwargs)
- exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
- exchange = kombu.entity.Exchange(name=exchange_name,
- type='topic',
- durable=options['durable'],
- auto_delete=options['auto_delete'])
- super(TopicConsumer, self).__init__(channel,
- callback,
- tag,
- name=name or topic,
- exchange=exchange,
- routing_key=topic,
- **options)
-
-
-class FanoutConsumer(ConsumerBase):
- """Consumer class for 'fanout'."""
-
- def __init__(self, conf, channel, topic, callback, tag, **kwargs):
- """Init a 'fanout' queue.
-
- 'channel' is the amqp channel to use
- 'topic' is the topic to listen on
- 'callback' is the callback to call when messages are received
- 'tag' is a unique ID for the consumer on the channel
-
- Other kombu options may be passed
- """
- unique = uuid.uuid4().hex
- exchange_name = '%s_fanout' % topic
- queue_name = '%s_fanout_%s' % (topic, unique)
-
- # Default options
- options = {'durable': False,
- 'queue_arguments': _get_queue_arguments(conf),
- 'auto_delete': True,
- 'exclusive': False}
- options.update(kwargs)
- exchange = kombu.entity.Exchange(name=exchange_name, type='fanout',
- durable=options['durable'],
- auto_delete=options['auto_delete'])
- super(FanoutConsumer, self).__init__(channel, callback, tag,
- name=queue_name,
- exchange=exchange,
- routing_key=topic,
- **options)
-
-
-class Publisher(object):
- """Base Publisher class."""
-
- def __init__(self, channel, exchange_name, routing_key, **kwargs):
- """Init the Publisher class with the exchange_name, routing_key,
- and other options
- """
- self.exchange_name = exchange_name
- self.routing_key = routing_key
- self.kwargs = kwargs
- self.reconnect(channel)
-
- def reconnect(self, channel):
- """Re-establish the Producer after a rabbit reconnection."""
- self.exchange = kombu.entity.Exchange(name=self.exchange_name,
- **self.kwargs)
- self.producer = kombu.messaging.Producer(exchange=self.exchange,
- channel=channel,
- routing_key=self.routing_key)
-
- def send(self, msg, timeout=None):
- """Send a message."""
- if timeout:
- #
- # AMQP TTL is in milliseconds when set in the header.
- #
- self.producer.publish(msg, headers={'ttl': (timeout * 1000)})
- else:
- self.producer.publish(msg)
-
-
-class DirectPublisher(Publisher):
- """Publisher class for 'direct'."""
- def __init__(self, conf, channel, msg_id, **kwargs):
- """init a 'direct' publisher.
-
- Kombu options may be passed as keyword args to override defaults
- """
-
- options = {'durable': False,
- 'auto_delete': True,
- 'exclusive': False}
- options.update(kwargs)
- super(DirectPublisher, self).__init__(channel, msg_id, msg_id,
- type='direct', **options)
-
-
-class TopicPublisher(Publisher):
- """Publisher class for 'topic'."""
- def __init__(self, conf, channel, topic, **kwargs):
- """init a 'topic' publisher.
-
- Kombu options may be passed as keyword args to override defaults
- """
- options = {'durable': conf.amqp_durable_queues,
- 'auto_delete': conf.amqp_auto_delete,
- 'exclusive': False}
- options.update(kwargs)
- exchange_name = rpc_amqp.get_control_exchange(conf)
- super(TopicPublisher, self).__init__(channel,
- exchange_name,
- topic,
- type='topic',
- **options)
-
-
-class FanoutPublisher(Publisher):
- """Publisher class for 'fanout'."""
- def __init__(self, conf, channel, topic, **kwargs):
- """init a 'fanout' publisher.
-
- Kombu options may be passed as keyword args to override defaults
- """
- options = {'durable': False,
- 'auto_delete': True,
- 'exclusive': False}
- options.update(kwargs)
- super(FanoutPublisher, self).__init__(channel, '%s_fanout' % topic,
- None, type='fanout', **options)
-
-
-class NotifyPublisher(TopicPublisher):
- """Publisher class for 'notify'."""
-
- def __init__(self, conf, channel, topic, **kwargs):
- self.durable = kwargs.pop('durable', conf.amqp_durable_queues)
- self.queue_arguments = _get_queue_arguments(conf)
- super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs)
-
- def reconnect(self, channel):
- super(NotifyPublisher, self).reconnect(channel)
-
- # NOTE(jerdfelt): Normally the consumer would create the queue, but
- # we do this to ensure that messages don't get dropped if the
- # consumer is started after we do
- queue = kombu.entity.Queue(channel=channel,
- exchange=self.exchange,
- durable=self.durable,
- name=self.routing_key,
- routing_key=self.routing_key,
- queue_arguments=self.queue_arguments)
- queue.declare()
-
-
-class Connection(object):
- """Connection object."""
-
- pool = None
-
- def __init__(self, conf, server_params=None):
- self.consumers = []
- self.consumer_thread = None
- self.proxy_callbacks = []
- self.conf = conf
- self.max_retries = self.conf.rabbit_max_retries
- # Try forever?
- if self.max_retries <= 0:
- self.max_retries = None
- self.interval_start = self.conf.rabbit_retry_interval
- self.interval_stepping = self.conf.rabbit_retry_backoff
- # max retry-interval = 30 seconds
- self.interval_max = 30
- self.memory_transport = False
-
- if server_params is None:
- server_params = {}
- # Keys to translate from server_params to kombu params
- server_params_to_kombu_params = {'username': 'userid'}
-
- ssl_params = self._fetch_ssl_params()
- params_list = []
- for adr in self.conf.rabbit_hosts:
- hostname, port = network_utils.parse_host_port(
- adr, default_port=self.conf.rabbit_port)
-
- params = {
- 'hostname': hostname,
- 'port': port,
- 'userid': self.conf.rabbit_userid,
- 'password': self.conf.rabbit_password,
- 'virtual_host': self.conf.rabbit_virtual_host,
- }
-
- for sp_key, value in six.iteritems(server_params):
- p_key = server_params_to_kombu_params.get(sp_key, sp_key)
- params[p_key] = value
-
- if self.conf.fake_rabbit:
- params['transport'] = 'memory'
- if self.conf.rabbit_use_ssl:
- params['ssl'] = ssl_params
-
- params_list.append(params)
-
- self.params_list = params_list
-
- brokers_count = len(self.params_list)
- self.next_broker_indices = itertools.cycle(range(brokers_count))
-
- self.memory_transport = self.conf.fake_rabbit
-
- self.connection = None
- self.reconnect()
-
- def _fetch_ssl_params(self):
- """Handles fetching what ssl params should be used for the connection
- (if any).
- """
- ssl_params = dict()
-
- # http://docs.python.org/library/ssl.html - ssl.wrap_socket
- if self.conf.kombu_ssl_version:
- ssl_params['ssl_version'] = sslutils.validate_ssl_version(
- self.conf.kombu_ssl_version)
- if self.conf.kombu_ssl_keyfile:
- ssl_params['keyfile'] = self.conf.kombu_ssl_keyfile
- if self.conf.kombu_ssl_certfile:
- ssl_params['certfile'] = self.conf.kombu_ssl_certfile
- if self.conf.kombu_ssl_ca_certs:
- ssl_params['ca_certs'] = self.conf.kombu_ssl_ca_certs
- # We might want to allow variations in the
- # future with this?
- ssl_params['cert_reqs'] = ssl.CERT_REQUIRED
-
- # Return the extended behavior or just have the default behavior
- return ssl_params or True
-
- def _connect(self, params):
- """Connect to rabbit. Re-establish any queues that may have
- been declared before if we are reconnecting. Exceptions should
- be handled by the caller.
- """
- if self.connection:
- LOG.info(_LI("Reconnecting to AMQP server on "
- "%(hostname)s:%(port)d") % params)
- try:
- self.connection.release()
- except self.connection_errors:
- pass
- # Setting this in case the next statement fails, though
- # it shouldn't be doing any network operations, yet.
- self.connection = None
- self.connection = kombu.connection.BrokerConnection(**params)
- self.connection_errors = self.connection.connection_errors
- if self.memory_transport:
- # Kludge to speed up tests.
- self.connection.transport.polling_interval = 0.0
- self.consumer_num = itertools.count(1)
- self.connection.connect()
- self.channel = self.connection.channel()
- # work around 'memory' transport bug in 1.1.3
- if self.memory_transport:
- self.channel._new_queue('ae.undeliver')
- for consumer in self.consumers:
- consumer.reconnect(self.channel)
- LOG.info(_LI('Connected to AMQP server on %(hostname)s:%(port)d') %
- params)
-
- def reconnect(self):
- """Handles reconnecting and re-establishing queues.
- Will retry up to self.max_retries number of times.
- self.max_retries = 0 means to retry forever.
- Sleep between tries, starting at self.interval_start
- seconds, backing off self.interval_stepping number of seconds
- each attempt.
- """
-
- attempt = 0
- while True:
- params = self.params_list[next(self.next_broker_indices)]
- attempt += 1
- try:
- self._connect(params)
- return
- except (IOError, self.connection_errors) as e:
- pass
- except Exception as e:
- # NOTE(comstud): Unfortunately it's possible for amqplib
- # to return an error not covered by its transport
- # connection_errors in the case of a timeout waiting for
- # a protocol response. (See paste link in LP888621)
- # So, we check all exceptions for 'timeout' in them
- # and try to reconnect in this case.
- if 'timeout' not in str(e):
- raise
-
- log_info = {}
- log_info['err_str'] = str(e)
- log_info['max_retries'] = self.max_retries
- log_info.update(params)
-
- if self.max_retries and attempt == self.max_retries:
- msg = _('Unable to connect to AMQP server on '
- '%(hostname)s:%(port)d after %(max_retries)d '
- 'tries: %(err_str)s') % log_info
- LOG.error(msg)
- raise rpc_common.RPCException(msg)
-
- if attempt == 1:
- sleep_time = self.interval_start or 1
- elif attempt > 1:
- sleep_time += self.interval_stepping
- if self.interval_max:
- sleep_time = min(sleep_time, self.interval_max)
-
- log_info['sleep_time'] = sleep_time
- LOG.error(_LE('AMQP server on %(hostname)s:%(port)d is '
- 'unreachable: %(err_str)s. Trying again in '
- '%(sleep_time)d seconds.') % log_info)
- time.sleep(sleep_time)
-
- def ensure(self, error_callback, method, *args, **kwargs):
- while True:
- try:
- return method(*args, **kwargs)
- except (self.connection_errors, socket.timeout, IOError) as e:
- if error_callback:
- error_callback(e)
- except Exception as e:
- # NOTE(comstud): Unfortunately it's possible for amqplib
- # to return an error not covered by its transport
- # connection_errors in the case of a timeout waiting for
- # a protocol response. (See paste link in LP888621)
- # So, we check all exceptions for 'timeout' in them
- # and try to reconnect in this case.
- if 'timeout' not in str(e):
- raise
- if error_callback:
- error_callback(e)
- self.reconnect()
-
- def get_channel(self):
- """Convenience call for bin/clear_rabbit_queues."""
- return self.channel
-
- def close(self):
- """Close/release this connection."""
- self.cancel_consumer_thread()
- self.wait_on_proxy_callbacks()
- self.connection.release()
- self.connection = None
-
- def reset(self):
- """Reset a connection so it can be used again."""
- self.cancel_consumer_thread()
- self.wait_on_proxy_callbacks()
- self.channel.close()
- self.channel = self.connection.channel()
- # work around 'memory' transport bug in 1.1.3
- if self.memory_transport:
- self.channel._new_queue('ae.undeliver')
- self.consumers = []
-
- def declare_consumer(self, consumer_cls, topic, callback):
- """Create a Consumer using the class that was passed in and
- add it to our list of consumers
- """
-
- def _connect_error(exc):
- log_info = {'topic': topic, 'err_str': str(exc)}
- LOG.error(_LE("Failed to declare consumer for topic '%(topic)s': "
- "%(err_str)s") % log_info)
-
- def _declare_consumer():
- consumer = consumer_cls(self.conf, self.channel, topic, callback,
- six.next(self.consumer_num))
- self.consumers.append(consumer)
- return consumer
-
- return self.ensure(_connect_error, _declare_consumer)
-
- def iterconsume(self, limit=None, timeout=None):
- """Return an iterator that will consume from all queues/consumers."""
-
- info = {'do_consume': True}
-
- def _error_callback(exc):
- if isinstance(exc, socket.timeout):
- LOG.debug('Timed out waiting for RPC response: %s' %
- str(exc))
- raise rpc_common.Timeout()
- else:
- LOG.exception(_LE('Failed to consume message from queue: %s') %
- str(exc))
- info['do_consume'] = True
-
- def _consume():
- if info['do_consume']:
- queues_head = self.consumers[:-1] # not fanout.
- queues_tail = self.consumers[-1] # fanout
- for queue in queues_head:
- queue.consume(nowait=True)
- queues_tail.consume(nowait=False)
- info['do_consume'] = False
- return self.connection.drain_events(timeout=timeout)
-
- for iteration in itertools.count(0):
- if limit and iteration >= limit:
- raise StopIteration
- yield self.ensure(_error_callback, _consume)
-
- def cancel_consumer_thread(self):
- """Cancel a consumer thread."""
- if self.consumer_thread is not None:
- self.consumer_thread.kill()
- try:
- self.consumer_thread.wait()
- except greenlet.GreenletExit:
- pass
- self.consumer_thread = None
-
- def wait_on_proxy_callbacks(self):
- """Wait for all proxy callback threads to exit."""
- for proxy_cb in self.proxy_callbacks:
- proxy_cb.wait()
-
- def publisher_send(self, cls, topic, msg, timeout=None, **kwargs):
- """Send to a publisher based on the publisher class."""
-
- def _error_callback(exc):
- log_info = {'topic': topic, 'err_str': str(exc)}
- LOG.exception(_LE("Failed to publish message to topic "
- "'%(topic)s': %(err_str)s") % log_info)
-
- def _publish():
- publisher = cls(self.conf, self.channel, topic, **kwargs)
- publisher.send(msg, timeout)
-
- self.ensure(_error_callback, _publish)
-
- def declare_direct_consumer(self, topic, callback):
- """Create a 'direct' queue.
- In nova's use, this is generally a msg_id queue used for
- responses for call/multicall
- """
- self.declare_consumer(DirectConsumer, topic, callback)
-
- def declare_topic_consumer(self, topic, callback=None, queue_name=None,
- exchange_name=None, ack_on_error=True):
- """Create a 'topic' consumer."""
- self.declare_consumer(functools.partial(TopicConsumer,
- name=queue_name,
- exchange_name=exchange_name,
- ack_on_error=ack_on_error,
- ),
- topic, callback)
-
- def declare_fanout_consumer(self, topic, callback):
- """Create a 'fanout' consumer."""
- self.declare_consumer(FanoutConsumer, topic, callback)
-
- def direct_send(self, msg_id, msg):
- """Send a 'direct' message."""
- self.publisher_send(DirectPublisher, msg_id, msg)
-
- def topic_send(self, topic, msg, timeout=None):
- """Send a 'topic' message."""
- self.publisher_send(TopicPublisher, topic, msg, timeout)
-
- def fanout_send(self, topic, msg):
- """Send a 'fanout' message."""
- self.publisher_send(FanoutPublisher, topic, msg)
-
- def notify_send(self, topic, msg, **kwargs):
- """Send a notify message on a topic."""
- self.publisher_send(NotifyPublisher, topic, msg, None, **kwargs)
-
- def consume(self, limit=None):
- """Consume from all queues/consumers."""
- it = self.iterconsume(limit=limit)
- while True:
- try:
- six.next(it)
- except StopIteration:
- return
-
- def consume_in_thread(self):
- """Consumer from all queues/consumers in a greenthread."""
- @excutils.forever_retry_uncaught_exceptions
- def _consumer_thread():
- try:
- self.consume()
- except greenlet.GreenletExit:
- return
- if self.consumer_thread is None:
- self.consumer_thread = eventlet.spawn(_consumer_thread)
- return self.consumer_thread
-
- def create_consumer(self, topic, proxy, fanout=False):
- """Create a consumer that calls a method in a proxy object."""
- proxy_cb = rpc_amqp.ProxyCallback(
- self.conf, proxy,
- rpc_amqp.get_connection_pool(self.conf, Connection))
- self.proxy_callbacks.append(proxy_cb)
-
- if fanout:
- self.declare_fanout_consumer(topic, proxy_cb)
- else:
- self.declare_topic_consumer(topic, proxy_cb)
-
- def create_worker(self, topic, proxy, pool_name):
- """Create a worker that calls a method in a proxy object."""
- proxy_cb = rpc_amqp.ProxyCallback(
- self.conf, proxy,
- rpc_amqp.get_connection_pool(self.conf, Connection))
- self.proxy_callbacks.append(proxy_cb)
- self.declare_topic_consumer(topic, proxy_cb, pool_name)
-
- def join_consumer_pool(self, callback, pool_name, topic,
- exchange_name=None, ack_on_error=True):
- """Register as a member of a group of consumers for a given topic from
- the specified exchange.
-
- Exactly one member of a given pool will receive each message.
-
- A message will be delivered to multiple pools, if more than
- one is created.
- """
- callback_wrapper = rpc_amqp.CallbackWrapper(
- conf=self.conf,
- callback=callback,
- connection_pool=rpc_amqp.get_connection_pool(self.conf,
- Connection),
- wait_for_consumers=not ack_on_error
- )
- self.proxy_callbacks.append(callback_wrapper)
- self.declare_topic_consumer(
- queue_name=pool_name,
- topic=topic,
- exchange_name=exchange_name,
- callback=callback_wrapper,
- ack_on_error=ack_on_error,
- )
-
-
-def create_connection(conf, new=True):
- """Create a connection."""
- return rpc_amqp.create_connection(
- conf, new,
- rpc_amqp.get_connection_pool(conf, Connection))
-
-
-def multicall(conf, context, topic, msg, timeout=None):
- """Make a call that returns multiple times."""
- return rpc_amqp.multicall(
- conf, context, topic, msg, timeout,
- rpc_amqp.get_connection_pool(conf, Connection))
-
-
-def call(conf, context, topic, msg, timeout=None):
- """Sends a message on a topic and wait for a response."""
- return rpc_amqp.call(
- conf, context, topic, msg, timeout,
- rpc_amqp.get_connection_pool(conf, Connection))
-
-
-def cast(conf, context, topic, msg):
- """Sends a message on a topic without waiting for a response."""
- return rpc_amqp.cast(
- conf, context, topic, msg,
- rpc_amqp.get_connection_pool(conf, Connection))
-
-
-def fanout_cast(conf, context, topic, msg):
- """Sends a message on a fanout exchange without waiting for a response."""
- return rpc_amqp.fanout_cast(
- conf, context, topic, msg,
- rpc_amqp.get_connection_pool(conf, Connection))
-
-
-def cast_to_server(conf, context, server_params, topic, msg):
- """Sends a message on a topic to a specific server."""
- return rpc_amqp.cast_to_server(
- conf, context, server_params, topic, msg,
- rpc_amqp.get_connection_pool(conf, Connection))
-
-
-def fanout_cast_to_server(conf, context, server_params, topic, msg):
- """Sends a message on a fanout exchange to a specific server."""
- return rpc_amqp.fanout_cast_to_server(
- conf, context, server_params, topic, msg,
- rpc_amqp.get_connection_pool(conf, Connection))
-
-
-def notify(conf, context, topic, msg, envelope):
- """Sends a notification event on a topic."""
- return rpc_amqp.notify(
- conf, context, topic, msg,
- rpc_amqp.get_connection_pool(conf, Connection),
- envelope)
-
-
-def cleanup():
- return rpc_amqp.cleanup(Connection.pool)
diff --git a/ceilometer/openstack/common/rpc/impl_qpid.py b/ceilometer/openstack/common/rpc/impl_qpid.py
deleted file mode 100644
index 263ff9af..00000000
--- a/ceilometer/openstack/common/rpc/impl_qpid.py
+++ /dev/null
@@ -1,823 +0,0 @@
-# Copyright 2011 OpenStack Foundation
-# Copyright 2011 - 2012, Red Hat, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import functools
-import itertools
-import time
-
-import eventlet
-import greenlet
-from oslo.config import cfg
-import six
-
-from ceilometer.openstack.common import excutils
-from ceilometer.openstack.common.gettextutils import _, _LE, _LI
-from ceilometer.openstack.common import importutils
-from ceilometer.openstack.common import jsonutils
-from ceilometer.openstack.common import log as logging
-from ceilometer.openstack.common.rpc import amqp as rpc_amqp
-from ceilometer.openstack.common.rpc import common as rpc_common
-
-qpid_codec = importutils.try_import("qpid.codec010")
-qpid_messaging = importutils.try_import("qpid.messaging")
-qpid_exceptions = importutils.try_import("qpid.messaging.exceptions")
-
-LOG = logging.getLogger(__name__)
-
-qpid_opts = [
- cfg.StrOpt('qpid_hostname',
- default='localhost',
- help='Qpid broker hostname'),
- cfg.IntOpt('qpid_port',
- default=5672,
- help='Qpid broker port'),
- cfg.ListOpt('qpid_hosts',
- default=['$qpid_hostname:$qpid_port'],
- help='Qpid HA cluster host:port pairs'),
- cfg.StrOpt('qpid_username',
- default='',
- help='Username for qpid connection'),
- cfg.StrOpt('qpid_password',
- default='',
- help='Password for qpid connection',
- secret=True),
- cfg.StrOpt('qpid_sasl_mechanisms',
- default='',
- help='Space separated list of SASL mechanisms to use for auth'),
- cfg.IntOpt('qpid_heartbeat',
- default=60,
- help='Seconds between connection keepalive heartbeats'),
- cfg.StrOpt('qpid_protocol',
- default='tcp',
- help="Transport to use, either 'tcp' or 'ssl'"),
- cfg.BoolOpt('qpid_tcp_nodelay',
- default=True,
- help='Disable Nagle algorithm'),
- # NOTE(russellb) If any additional versions are added (beyond 1 and 2),
- # this file could probably use some additional refactoring so that the
- # differences between each version are split into different classes.
- cfg.IntOpt('qpid_topology_version',
- default=1,
- help="The qpid topology version to use. Version 1 is what "
- "was originally used by impl_qpid. Version 2 includes "
- "some backwards-incompatible changes that allow broker "
- "federation to work. Users should update to version 2 "
- "when they are able to take everything down, as it "
- "requires a clean break."),
-]
-
-cfg.CONF.register_opts(qpid_opts)
-
-JSON_CONTENT_TYPE = 'application/json; charset=utf8'
-
-
-def raise_invalid_topology_version(conf):
- msg = (_("Invalid value for qpid_topology_version: %d") %
- conf.qpid_topology_version)
- LOG.error(msg)
- raise Exception(msg)
-
-
-class ConsumerBase(object):
- """Consumer base class."""
-
- def __init__(self, conf, session, callback, node_name, node_opts,
- link_name, link_opts):
- """Declare a queue on an amqp session.
-
- 'session' is the amqp session to use
- 'callback' is the callback to call when messages are received
- 'node_name' is the first part of the Qpid address string, before ';'
- 'node_opts' will be applied to the "x-declare" section of "node"
- in the address string.
- 'link_name' goes into the "name" field of the "link" in the address
- string
- 'link_opts' will be applied to the "x-declare" section of "link"
- in the address string.
- """
- self.callback = callback
- self.receiver = None
- self.session = None
-
- if conf.qpid_topology_version == 1:
- addr_opts = {
- "create": "always",
- "node": {
- "type": "topic",
- "x-declare": {
- "durable": True,
- "auto-delete": True,
- },
- },
- "link": {
- "durable": True,
- "x-declare": {
- "durable": False,
- "auto-delete": True,
- "exclusive": False,
- },
- },
- }
- addr_opts["node"]["x-declare"].update(node_opts)
- elif conf.qpid_topology_version == 2:
- addr_opts = {
- "link": {
- "x-declare": {
- "auto-delete": True,
- "exclusive": False,
- },
- },
- }
- else:
- raise_invalid_topology_version()
-
- addr_opts["link"]["x-declare"].update(link_opts)
- if link_name:
- addr_opts["link"]["name"] = link_name
-
- self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
-
- self.connect(session)
-
- def connect(self, session):
- """Declare the receiver on connect."""
- self._declare_receiver(session)
-
- def reconnect(self, session):
- """Re-declare the receiver after a qpid reconnect."""
- self._declare_receiver(session)
-
- def _declare_receiver(self, session):
- self.session = session
- self.receiver = session.receiver(self.address)
- self.receiver.capacity = 1
-
- def _unpack_json_msg(self, msg):
- """Load the JSON data in msg if msg.content_type indicates that it
- is necessary. Put the loaded data back into msg.content and
- update msg.content_type appropriately.
-
- A Qpid Message containing a dict will have a content_type of
- 'amqp/map', whereas one containing a string that needs to be converted
- back from JSON will have a content_type of JSON_CONTENT_TYPE.
-
- :param msg: a Qpid Message object
- :returns: None
- """
- if msg.content_type == JSON_CONTENT_TYPE:
- msg.content = jsonutils.loads(msg.content)
- msg.content_type = 'amqp/map'
-
- def consume(self):
- """Fetch the message and pass it to the callback object."""
- message = self.receiver.fetch()
- try:
- self._unpack_json_msg(message)
- msg = rpc_common.deserialize_msg(message.content)
- self.callback(msg)
- except Exception:
- LOG.exception(_LE("Failed to process message... skipping it."))
- finally:
- # TODO(sandy): Need support for optional ack_on_error.
- self.session.acknowledge(message)
-
- def get_receiver(self):
- return self.receiver
-
- def get_node_name(self):
- return self.address.split(';')[0]
-
-
-class DirectConsumer(ConsumerBase):
- """Queue/consumer class for 'direct'."""
-
- def __init__(self, conf, session, msg_id, callback):
- """Init a 'direct' queue.
-
- 'session' is the amqp session to use
- 'msg_id' is the msg_id to listen on
- 'callback' is the callback to call when messages are received
- """
-
- link_opts = {
- "auto-delete": conf.amqp_auto_delete,
- "exclusive": True,
- "durable": conf.amqp_durable_queues,
- }
-
- if conf.qpid_topology_version == 1:
- node_name = "%s/%s" % (msg_id, msg_id)
- node_opts = {"type": "direct"}
- link_name = msg_id
- elif conf.qpid_topology_version == 2:
- node_name = "amq.direct/%s" % msg_id
- node_opts = {}
- link_name = None
- else:
- raise_invalid_topology_version()
-
- super(DirectConsumer, self).__init__(conf, session, callback,
- node_name, node_opts, link_name,
- link_opts)
-
-
-class TopicConsumer(ConsumerBase):
- """Consumer class for 'topic'."""
-
- def __init__(self, conf, session, topic, callback, name=None,
- exchange_name=None):
- """Init a 'topic' queue.
-
- :param session: the amqp session to use
- :param topic: is the topic to listen on
- :paramtype topic: str
- :param callback: the callback to call when messages are received
- :param name: optional queue name, defaults to topic
- """
-
- exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
- link_opts = {
- "auto-delete": conf.amqp_auto_delete,
- "durable": conf.amqp_durable_queues,
- }
-
- if conf.qpid_topology_version == 1:
- node_name = "%s/%s" % (exchange_name, topic)
- elif conf.qpid_topology_version == 2:
- node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic)
- else:
- raise_invalid_topology_version()
-
- super(TopicConsumer, self).__init__(conf, session, callback, node_name,
- {}, name or topic, link_opts)
-
-
-class FanoutConsumer(ConsumerBase):
- """Consumer class for 'fanout'."""
-
- def __init__(self, conf, session, topic, callback):
- """Init a 'fanout' queue.
-
- 'session' is the amqp session to use
- 'topic' is the topic to listen on
- 'callback' is the callback to call when messages are received
- """
- self.conf = conf
-
- link_opts = {"exclusive": True}
-
- if conf.qpid_topology_version == 1:
- node_name = "%s_fanout" % topic
- node_opts = {"durable": False, "type": "fanout"}
- elif conf.qpid_topology_version == 2:
- node_name = "amq.topic/fanout/%s" % topic
- node_opts = {}
- else:
- raise_invalid_topology_version()
-
- super(FanoutConsumer, self).__init__(conf, session, callback,
- node_name, node_opts, None,
- link_opts)
-
-
-class Publisher(object):
- """Base Publisher class."""
-
- def __init__(self, conf, session, node_name, node_opts=None):
- """Init the Publisher class with the exchange_name, routing_key,
- and other options
- """
- self.sender = None
- self.session = session
-
- if conf.qpid_topology_version == 1:
- addr_opts = {
- "create": "always",
- "node": {
- "type": "topic",
- "x-declare": {
- "durable": False,
- # auto-delete isn't implemented for exchanges in qpid,
- # but put in here anyway
- "auto-delete": True,
- },
- },
- }
- if node_opts:
- addr_opts["node"]["x-declare"].update(node_opts)
-
- self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
- elif conf.qpid_topology_version == 2:
- self.address = node_name
- else:
- raise_invalid_topology_version()
-
- self.reconnect(session)
-
- def reconnect(self, session):
- """Re-establish the Sender after a reconnection."""
- self.sender = session.sender(self.address)
-
- def _pack_json_msg(self, msg):
- """Qpid cannot serialize dicts containing strings longer than 65535
- characters. This function dumps the message content to a JSON
- string, which Qpid is able to handle.
-
- :param msg: May be either a Qpid Message object or a bare dict.
- :returns: A Qpid Message with its content field JSON encoded.
- """
- try:
- msg.content = jsonutils.dumps(msg.content)
- except AttributeError:
- # Need to have a Qpid message so we can set the content_type.
- msg = qpid_messaging.Message(jsonutils.dumps(msg))
- msg.content_type = JSON_CONTENT_TYPE
- return msg
-
- def send(self, msg):
- """Send a message."""
- try:
- # Check if Qpid can encode the message
- check_msg = msg
- if not hasattr(check_msg, 'content_type'):
- check_msg = qpid_messaging.Message(msg)
- content_type = check_msg.content_type
- enc, dec = qpid_messaging.message.get_codec(content_type)
- enc(check_msg.content)
- except qpid_codec.CodecException:
- # This means the message couldn't be serialized as a dict.
- msg = self._pack_json_msg(msg)
- self.sender.send(msg)
-
-
-class DirectPublisher(Publisher):
- """Publisher class for 'direct'."""
- def __init__(self, conf, session, msg_id):
- """Init a 'direct' publisher."""
-
- if conf.qpid_topology_version == 1:
- node_name = msg_id
- node_opts = {"type": "direct"}
- elif conf.qpid_topology_version == 2:
- node_name = "amq.direct/%s" % msg_id
- node_opts = {}
- else:
- raise_invalid_topology_version()
-
- super(DirectPublisher, self).__init__(conf, session, node_name,
- node_opts)
-
-
-class TopicPublisher(Publisher):
- """Publisher class for 'topic'."""
- def __init__(self, conf, session, topic):
- """Init a 'topic' publisher.
- """
- exchange_name = rpc_amqp.get_control_exchange(conf)
-
- if conf.qpid_topology_version == 1:
- node_name = "%s/%s" % (exchange_name, topic)
- elif conf.qpid_topology_version == 2:
- node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic)
- else:
- raise_invalid_topology_version()
-
- super(TopicPublisher, self).__init__(conf, session, node_name)
-
-
-class FanoutPublisher(Publisher):
- """Publisher class for 'fanout'."""
- def __init__(self, conf, session, topic):
- """Init a 'fanout' publisher.
- """
-
- if conf.qpid_topology_version == 1:
- node_name = "%s_fanout" % topic
- node_opts = {"type": "fanout"}
- elif conf.qpid_topology_version == 2:
- node_name = "amq.topic/fanout/%s" % topic
- node_opts = {}
- else:
- raise_invalid_topology_version()
-
- super(FanoutPublisher, self).__init__(conf, session, node_name,
- node_opts)
-
-
-class NotifyPublisher(Publisher):
- """Publisher class for notifications."""
- def __init__(self, conf, session, topic):
- """Init a 'topic' publisher.
- """
- exchange_name = rpc_amqp.get_control_exchange(conf)
- node_opts = {"durable": True}
-
- if conf.qpid_topology_version == 1:
- node_name = "%s/%s" % (exchange_name, topic)
- elif conf.qpid_topology_version == 2:
- node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic)
- else:
- raise_invalid_topology_version()
-
- super(NotifyPublisher, self).__init__(conf, session, node_name,
- node_opts)
-
-
-class Connection(object):
- """Connection object."""
-
- pool = None
-
- def __init__(self, conf, server_params=None):
- if not qpid_messaging:
- raise ImportError("Failed to import qpid.messaging")
-
- self.session = None
- self.consumers = {}
- self.consumer_thread = None
- self.proxy_callbacks = []
- self.conf = conf
-
- if server_params and 'hostname' in server_params:
- # NOTE(russellb) This enables support for cast_to_server.
- server_params['qpid_hosts'] = [
- '%s:%d' % (server_params['hostname'],
- server_params.get('port', 5672))
- ]
-
- params = {
- 'qpid_hosts': self.conf.qpid_hosts,
- 'username': self.conf.qpid_username,
- 'password': self.conf.qpid_password,
- }
- params.update(server_params or {})
-
- self.brokers = params['qpid_hosts']
- self.username = params['username']
- self.password = params['password']
-
- brokers_count = len(self.brokers)
- self.next_broker_indices = itertools.cycle(range(brokers_count))
-
- self.connection_create(self.brokers[0])
- self.reconnect()
-
- def connection_create(self, broker):
- # Create the connection - this does not open the connection
- self.connection = qpid_messaging.Connection(broker)
-
- # Check if flags are set and if so set them for the connection
- # before we call open
- self.connection.username = self.username
- self.connection.password = self.password
-
- self.connection.sasl_mechanisms = self.conf.qpid_sasl_mechanisms
- # Reconnection is done by self.reconnect()
- self.connection.reconnect = False
- self.connection.heartbeat = self.conf.qpid_heartbeat
- self.connection.transport = self.conf.qpid_protocol
- self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay
-
- def _register_consumer(self, consumer):
- self.consumers[str(consumer.get_receiver())] = consumer
-
- def _lookup_consumer(self, receiver):
- return self.consumers[str(receiver)]
-
- def reconnect(self):
- """Handles reconnecting and re-establishing sessions and queues."""
- delay = 1
- while True:
- # Close the session if necessary
- if self.connection.opened():
- try:
- self.connection.close()
- except qpid_exceptions.ConnectionError:
- pass
-
- broker = self.brokers[next(self.next_broker_indices)]
-
- try:
- self.connection_create(broker)
- self.connection.open()
- except qpid_exceptions.ConnectionError as e:
- msg_dict = dict(e=e, delay=delay)
- msg = _LE("Unable to connect to AMQP server: %(e)s. "
- "Sleeping %(delay)s seconds") % msg_dict
- LOG.error(msg)
- time.sleep(delay)
- delay = min(delay + 1, 5)
- else:
- LOG.info(_LI('Connected to AMQP server on %s'), broker)
- break
-
- self.session = self.connection.session()
-
- if self.consumers:
- consumers = self.consumers
- self.consumers = {}
-
- for consumer in six.itervalues(consumers):
- consumer.reconnect(self.session)
- self._register_consumer(consumer)
-
- LOG.debug("Re-established AMQP queues")
-
- def ensure(self, error_callback, method, *args, **kwargs):
- while True:
- try:
- return method(*args, **kwargs)
- except (qpid_exceptions.Empty,
- qpid_exceptions.ConnectionError) as e:
- if error_callback:
- error_callback(e)
- self.reconnect()
-
- def close(self):
- """Close/release this connection."""
- self.cancel_consumer_thread()
- self.wait_on_proxy_callbacks()
- try:
- self.connection.close()
- except Exception:
- # NOTE(dripton) Logging exceptions that happen during cleanup just
- # causes confusion; there's really nothing useful we can do with
- # them.
- pass
- self.connection = None
-
- def reset(self):
- """Reset a connection so it can be used again."""
- self.cancel_consumer_thread()
- self.wait_on_proxy_callbacks()
- self.session.close()
- self.session = self.connection.session()
- self.consumers = {}
-
- def declare_consumer(self, consumer_cls, topic, callback):
- """Create a Consumer using the class that was passed in and
- add it to our list of consumers
- """
- def _connect_error(exc):
- log_info = {'topic': topic, 'err_str': str(exc)}
- LOG.error(_LE("Failed to declare consumer for topic '%(topic)s': "
- "%(err_str)s") % log_info)
-
- def _declare_consumer():
- consumer = consumer_cls(self.conf, self.session, topic, callback)
- self._register_consumer(consumer)
- return consumer
-
- return self.ensure(_connect_error, _declare_consumer)
-
- def iterconsume(self, limit=None, timeout=None):
- """Return an iterator that will consume from all queues/consumers."""
-
- def _error_callback(exc):
- if isinstance(exc, qpid_exceptions.Empty):
- LOG.debug('Timed out waiting for RPC response: %s' %
- str(exc))
- raise rpc_common.Timeout()
- else:
- LOG.exception(_LE('Failed to consume message from queue: %s') %
- str(exc))
-
- def _consume():
- nxt_receiver = self.session.next_receiver(timeout=timeout)
- try:
- self._lookup_consumer(nxt_receiver).consume()
- except Exception:
- LOG.exception(_LE("Error processing message. Skipping it."))
-
- for iteration in itertools.count(0):
- if limit and iteration >= limit:
- raise StopIteration
- yield self.ensure(_error_callback, _consume)
-
- def cancel_consumer_thread(self):
- """Cancel a consumer thread."""
- if self.consumer_thread is not None:
- self.consumer_thread.kill()
- try:
- self.consumer_thread.wait()
- except greenlet.GreenletExit:
- pass
- self.consumer_thread = None
-
- def wait_on_proxy_callbacks(self):
- """Wait for all proxy callback threads to exit."""
- for proxy_cb in self.proxy_callbacks:
- proxy_cb.wait()
-
- def publisher_send(self, cls, topic, msg):
- """Send to a publisher based on the publisher class."""
-
- def _connect_error(exc):
- log_info = {'topic': topic, 'err_str': str(exc)}
- LOG.exception(_LE("Failed to publish message to topic "
- "'%(topic)s': %(err_str)s") % log_info)
-
- def _publisher_send():
- publisher = cls(self.conf, self.session, topic)
- publisher.send(msg)
-
- return self.ensure(_connect_error, _publisher_send)
-
- def declare_direct_consumer(self, topic, callback):
- """Create a 'direct' queue.
- In nova's use, this is generally a msg_id queue used for
- responses for call/multicall
- """
- self.declare_consumer(DirectConsumer, topic, callback)
-
- def declare_topic_consumer(self, topic, callback=None, queue_name=None,
- exchange_name=None):
- """Create a 'topic' consumer."""
- self.declare_consumer(functools.partial(TopicConsumer,
- name=queue_name,
- exchange_name=exchange_name,
- ),
- topic, callback)
-
- def declare_fanout_consumer(self, topic, callback):
- """Create a 'fanout' consumer."""
- self.declare_consumer(FanoutConsumer, topic, callback)
-
- def direct_send(self, msg_id, msg):
- """Send a 'direct' message."""
- self.publisher_send(DirectPublisher, msg_id, msg)
-
- def topic_send(self, topic, msg, timeout=None):
- """Send a 'topic' message."""
- #
- # We want to create a message with attributes, e.g. a TTL. We
- # don't really need to keep 'msg' in its JSON format any longer
- # so let's create an actual qpid message here and get some
- # value-add on the go.
- #
- # WARNING: Request timeout happens to be in the same units as
- # qpid's TTL (seconds). If this changes in the future, then this
- # will need to be altered accordingly.
- #
- qpid_message = qpid_messaging.Message(content=msg, ttl=timeout)
- self.publisher_send(TopicPublisher, topic, qpid_message)
-
- def fanout_send(self, topic, msg):
- """Send a 'fanout' message."""
- self.publisher_send(FanoutPublisher, topic, msg)
-
- def notify_send(self, topic, msg, **kwargs):
- """Send a notify message on a topic."""
- self.publisher_send(NotifyPublisher, topic, msg)
-
- def consume(self, limit=None):
- """Consume from all queues/consumers."""
- it = self.iterconsume(limit=limit)
- while True:
- try:
- six.next(it)
- except StopIteration:
- return
-
- def consume_in_thread(self):
- """Consumer from all queues/consumers in a greenthread."""
- @excutils.forever_retry_uncaught_exceptions
- def _consumer_thread():
- try:
- self.consume()
- except greenlet.GreenletExit:
- return
- if self.consumer_thread is None:
- self.consumer_thread = eventlet.spawn(_consumer_thread)
- return self.consumer_thread
-
- def create_consumer(self, topic, proxy, fanout=False):
- """Create a consumer that calls a method in a proxy object."""
- proxy_cb = rpc_amqp.ProxyCallback(
- self.conf, proxy,
- rpc_amqp.get_connection_pool(self.conf, Connection))
- self.proxy_callbacks.append(proxy_cb)
-
- if fanout:
- consumer = FanoutConsumer(self.conf, self.session, topic, proxy_cb)
- else:
- consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb)
-
- self._register_consumer(consumer)
-
- return consumer
-
- def create_worker(self, topic, proxy, pool_name):
- """Create a worker that calls a method in a proxy object."""
- proxy_cb = rpc_amqp.ProxyCallback(
- self.conf, proxy,
- rpc_amqp.get_connection_pool(self.conf, Connection))
- self.proxy_callbacks.append(proxy_cb)
-
- consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb,
- name=pool_name)
-
- self._register_consumer(consumer)
-
- return consumer
-
- def join_consumer_pool(self, callback, pool_name, topic,
- exchange_name=None, ack_on_error=True):
- """Register as a member of a group of consumers for a given topic from
- the specified exchange.
-
- Exactly one member of a given pool will receive each message.
-
- A message will be delivered to multiple pools, if more than
- one is created.
- """
- callback_wrapper = rpc_amqp.CallbackWrapper(
- conf=self.conf,
- callback=callback,
- connection_pool=rpc_amqp.get_connection_pool(self.conf,
- Connection),
- wait_for_consumers=not ack_on_error
- )
- self.proxy_callbacks.append(callback_wrapper)
-
- consumer = TopicConsumer(conf=self.conf,
- session=self.session,
- topic=topic,
- callback=callback_wrapper,
- name=pool_name,
- exchange_name=exchange_name)
-
- self._register_consumer(consumer)
- return consumer
-
-
-def create_connection(conf, new=True):
- """Create a connection."""
- return rpc_amqp.create_connection(
- conf, new,
- rpc_amqp.get_connection_pool(conf, Connection))
-
-
-def multicall(conf, context, topic, msg, timeout=None):
- """Make a call that returns multiple times."""
- return rpc_amqp.multicall(
- conf, context, topic, msg, timeout,
- rpc_amqp.get_connection_pool(conf, Connection))
-
-
-def call(conf, context, topic, msg, timeout=None):
- """Sends a message on a topic and wait for a response."""
- return rpc_amqp.call(
- conf, context, topic, msg, timeout,
- rpc_amqp.get_connection_pool(conf, Connection))
-
-
-def cast(conf, context, topic, msg):
- """Sends a message on a topic without waiting for a response."""
- return rpc_amqp.cast(
- conf, context, topic, msg,
- rpc_amqp.get_connection_pool(conf, Connection))
-
-
-def fanout_cast(conf, context, topic, msg):
- """Sends a message on a fanout exchange without waiting for a response."""
- return rpc_amqp.fanout_cast(
- conf, context, topic, msg,
- rpc_amqp.get_connection_pool(conf, Connection))
-
-
-def cast_to_server(conf, context, server_params, topic, msg):
- """Sends a message on a topic to a specific server."""
- return rpc_amqp.cast_to_server(
- conf, context, server_params, topic, msg,
- rpc_amqp.get_connection_pool(conf, Connection))
-
-
-def fanout_cast_to_server(conf, context, server_params, topic, msg):
- """Sends a message on a fanout exchange to a specific server."""
- return rpc_amqp.fanout_cast_to_server(
- conf, context, server_params, topic, msg,
- rpc_amqp.get_connection_pool(conf, Connection))
-
-
-def notify(conf, context, topic, msg, envelope):
- """Sends a notification event on a topic."""
- return rpc_amqp.notify(conf, context, topic, msg,
- rpc_amqp.get_connection_pool(conf, Connection),
- envelope)
-
-
-def cleanup():
- return rpc_amqp.cleanup(Connection.pool)
diff --git a/ceilometer/openstack/common/rpc/impl_zmq.py b/ceilometer/openstack/common/rpc/impl_zmq.py
deleted file mode 100644
index 4f540d9f..00000000
--- a/ceilometer/openstack/common/rpc/impl_zmq.py
+++ /dev/null
@@ -1,818 +0,0 @@
-# Copyright 2011 Cloudscaling Group, Inc
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import os
-import pprint
-import re
-import socket
-import sys
-import types
-import uuid
-
-import eventlet
-import greenlet
-from oslo.config import cfg
-import six
-from six import moves
-
-from ceilometer.openstack.common import excutils
-from ceilometer.openstack.common.gettextutils import _, _LE, _LI
-from ceilometer.openstack.common import importutils
-from ceilometer.openstack.common import jsonutils
-from ceilometer.openstack.common.rpc import common as rpc_common
-
-zmq = importutils.try_import('eventlet.green.zmq')
-
-# for convenience, are not modified.
-pformat = pprint.pformat
-Timeout = eventlet.timeout.Timeout
-LOG = rpc_common.LOG
-RemoteError = rpc_common.RemoteError
-RPCException = rpc_common.RPCException
-
-zmq_opts = [
- cfg.StrOpt('rpc_zmq_bind_address', default='*',
- help='ZeroMQ bind address. Should be a wildcard (*), '
- 'an ethernet interface, or IP. '
- 'The "host" option should point or resolve to this '
- 'address.'),
-
- # The module.Class to use for matchmaking.
- cfg.StrOpt(
- 'rpc_zmq_matchmaker',
- default=('ceilometer.openstack.common.rpc.'
- 'matchmaker.MatchMakerLocalhost'),
- help='MatchMaker driver',
- ),
-
- # The following port is unassigned by IANA as of 2012-05-21
- cfg.IntOpt('rpc_zmq_port', default=9501,
- help='ZeroMQ receiver listening port'),
-
- cfg.IntOpt('rpc_zmq_contexts', default=1,
- help='Number of ZeroMQ contexts, defaults to 1'),
-
- cfg.IntOpt('rpc_zmq_topic_backlog', default=None,
- help='Maximum number of ingress messages to locally buffer '
- 'per topic. Default is unlimited.'),
-
- cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack',
- help='Directory for holding IPC sockets'),
-
- cfg.StrOpt('rpc_zmq_host', default=socket.gethostname(),
- help='Name of this node. Must be a valid hostname, FQDN, or '
- 'IP address. Must match "host" option, if running Nova.')
-]
-
-
-CONF = cfg.CONF
-CONF.register_opts(zmq_opts)
-
-ZMQ_CTX = None # ZeroMQ Context, must be global.
-matchmaker = None # memorized matchmaker object
-
-
-def _serialize(data):
- """Serialization wrapper.
-
- We prefer using JSON, but it cannot encode all types.
- Error if a developer passes us bad data.
- """
- try:
- return jsonutils.dumps(data, ensure_ascii=True)
- except TypeError:
- with excutils.save_and_reraise_exception():
- LOG.error(_LE("JSON serialization failed."))
-
-
-def _deserialize(data):
- """Deserialization wrapper."""
- LOG.debug("Deserializing: %s", data)
- return jsonutils.loads(data)
-
-
-class ZmqSocket(object):
- """A tiny wrapper around ZeroMQ.
-
- Simplifies the send/recv protocol and connection management.
- Can be used as a Context (supports the 'with' statement).
- """
-
- def __init__(self, addr, zmq_type, bind=True, subscribe=None):
- self.sock = _get_ctxt().socket(zmq_type)
- self.addr = addr
- self.type = zmq_type
- self.subscriptions = []
-
- # Support failures on sending/receiving on wrong socket type.
- self.can_recv = zmq_type in (zmq.PULL, zmq.SUB)
- self.can_send = zmq_type in (zmq.PUSH, zmq.PUB)
- self.can_sub = zmq_type in (zmq.SUB, )
-
- # Support list, str, & None for subscribe arg (cast to list)
- do_sub = {
- list: subscribe,
- str: [subscribe],
- type(None): []
- }[type(subscribe)]
-
- for f in do_sub:
- self.subscribe(f)
-
- str_data = {'addr': addr, 'type': self.socket_s(),
- 'subscribe': subscribe, 'bind': bind}
-
- LOG.debug("Connecting to %(addr)s with %(type)s", str_data)
- LOG.debug("-> Subscribed to %(subscribe)s", str_data)
- LOG.debug("-> bind: %(bind)s", str_data)
-
- try:
- if bind:
- self.sock.bind(addr)
- else:
- self.sock.connect(addr)
- except Exception:
- raise RPCException(_("Could not open socket."))
-
- def socket_s(self):
- """Get socket type as string."""
- t_enum = ('PUSH', 'PULL', 'PUB', 'SUB', 'REP', 'REQ', 'ROUTER',
- 'DEALER')
- return dict(map(lambda t: (getattr(zmq, t), t), t_enum))[self.type]
-
- def subscribe(self, msg_filter):
- """Subscribe."""
- if not self.can_sub:
- raise RPCException("Cannot subscribe on this socket.")
- LOG.debug("Subscribing to %s", msg_filter)
-
- try:
- self.sock.setsockopt(zmq.SUBSCRIBE, msg_filter)
- except Exception:
- return
-
- self.subscriptions.append(msg_filter)
-
- def unsubscribe(self, msg_filter):
- """Unsubscribe."""
- if msg_filter not in self.subscriptions:
- return
- self.sock.setsockopt(zmq.UNSUBSCRIBE, msg_filter)
- self.subscriptions.remove(msg_filter)
-
- def close(self):
- if self.sock is None or self.sock.closed:
- return
-
- # We must unsubscribe, or we'll leak descriptors.
- if self.subscriptions:
- for f in self.subscriptions:
- try:
- self.sock.setsockopt(zmq.UNSUBSCRIBE, f)
- except Exception:
- pass
- self.subscriptions = []
-
- try:
- # Default is to linger
- self.sock.close()
- except Exception:
- # While this is a bad thing to happen,
- # it would be much worse if some of the code calling this
- # were to fail. For now, lets log, and later evaluate
- # if we can safely raise here.
- LOG.error(_LE("ZeroMQ socket could not be closed."))
- self.sock = None
-
- def recv(self, **kwargs):
- if not self.can_recv:
- raise RPCException(_("You cannot recv on this socket."))
- return self.sock.recv_multipart(**kwargs)
-
- def send(self, data, **kwargs):
- if not self.can_send:
- raise RPCException(_("You cannot send on this socket."))
- self.sock.send_multipart(data, **kwargs)
-
-
-class ZmqClient(object):
- """Client for ZMQ sockets."""
-
- def __init__(self, addr):
- self.outq = ZmqSocket(addr, zmq.PUSH, bind=False)
-
- def cast(self, msg_id, topic, data, envelope):
- msg_id = msg_id or 0
-
- if not envelope:
- self.outq.send(map(bytes,
- (msg_id, topic, 'cast', _serialize(data))))
- return
-
- rpc_envelope = rpc_common.serialize_msg(data[1], envelope)
- zmq_msg = moves.reduce(lambda x, y: x + y, rpc_envelope.items())
- self.outq.send(map(bytes,
- (msg_id, topic, 'impl_zmq_v2', data[0]) + zmq_msg))
-
- def close(self):
- self.outq.close()
-
-
-class RpcContext(rpc_common.CommonRpcContext):
- """Context that supports replying to a rpc.call."""
- def __init__(self, **kwargs):
- self.replies = []
- super(RpcContext, self).__init__(**kwargs)
-
- def deepcopy(self):
- values = self.to_dict()
- values['replies'] = self.replies
- return self.__class__(**values)
-
- def reply(self, reply=None, failure=None, ending=False):
- if ending:
- return
- self.replies.append(reply)
-
- @classmethod
- def marshal(self, ctx):
- ctx_data = ctx.to_dict()
- return _serialize(ctx_data)
-
- @classmethod
- def unmarshal(self, data):
- return RpcContext.from_dict(_deserialize(data))
-
-
-class InternalContext(object):
- """Used by ConsumerBase as a private context for - methods."""
-
- def __init__(self, proxy):
- self.proxy = proxy
- self.msg_waiter = None
-
- def _get_response(self, ctx, proxy, topic, data):
- """Process a curried message and cast the result to topic."""
- LOG.debug("Running func with context: %s", ctx.to_dict())
- data.setdefault('version', None)
- data.setdefault('args', {})
-
- try:
- result = proxy.dispatch(
- ctx, data['version'], data['method'],
- data.get('namespace'), **data['args'])
- return ConsumerBase.normalize_reply(result, ctx.replies)
- except greenlet.GreenletExit:
- # ignore these since they are just from shutdowns
- pass
- except rpc_common.ClientException as e:
- LOG.debug("Expected exception during message handling (%s)" %
- e._exc_info[1])
- return {'exc':
- rpc_common.serialize_remote_exception(e._exc_info,
- log_failure=False)}
- except Exception:
- LOG.error(_LE("Exception during message handling"))
- return {'exc':
- rpc_common.serialize_remote_exception(sys.exc_info())}
-
- def reply(self, ctx, proxy,
- msg_id=None, context=None, topic=None, msg=None):
- """Reply to a casted call."""
- # NOTE(ewindisch): context kwarg exists for Grizzly compat.
- # this may be able to be removed earlier than
- # 'I' if ConsumerBase.process were refactored.
- if type(msg) is list:
- payload = msg[-1]
- else:
- payload = msg
-
- response = ConsumerBase.normalize_reply(
- self._get_response(ctx, proxy, topic, payload),
- ctx.replies)
-
- LOG.debug("Sending reply")
- _multi_send(_cast, ctx, topic, {
- 'method': '-process_reply',
- 'args': {
- 'msg_id': msg_id, # Include for Folsom compat.
- 'response': response
- }
- }, _msg_id=msg_id)
-
-
-class ConsumerBase(object):
- """Base Consumer."""
-
- def __init__(self):
- self.private_ctx = InternalContext(None)
-
- @classmethod
- def normalize_reply(self, result, replies):
- #TODO(ewindisch): re-evaluate and document this method.
- if isinstance(result, types.GeneratorType):
- return list(result)
- elif replies:
- return replies
- else:
- return [result]
-
- def process(self, proxy, ctx, data):
- data.setdefault('version', None)
- data.setdefault('args', {})
-
- # Method starting with - are
- # processed internally. (non-valid method name)
- method = data.get('method')
- if not method:
- LOG.error(_LE("RPC message did not include method."))
- return
-
- # Internal method
- # uses internal context for safety.
- if method == '-reply':
- self.private_ctx.reply(ctx, proxy, **data['args'])
- return
-
- proxy.dispatch(ctx, data['version'],
- data['method'], data.get('namespace'), **data['args'])
-
-
-class ZmqBaseReactor(ConsumerBase):
- """A consumer class implementing a centralized casting broker (PULL-PUSH).
-
- Used for RoundRobin requests.
- """
-
- def __init__(self, conf):
- super(ZmqBaseReactor, self).__init__()
-
- self.proxies = {}
- self.threads = []
- self.sockets = []
- self.subscribe = {}
-
- self.pool = eventlet.greenpool.GreenPool(conf.rpc_thread_pool_size)
-
- def register(self, proxy, in_addr, zmq_type_in,
- in_bind=True, subscribe=None):
-
- LOG.info(_LI("Registering reactor"))
-
- if zmq_type_in not in (zmq.PULL, zmq.SUB):
- raise RPCException("Bad input socktype")
-
- # Items push in.
- inq = ZmqSocket(in_addr, zmq_type_in, bind=in_bind,
- subscribe=subscribe)
-
- self.proxies[inq] = proxy
- self.sockets.append(inq)
-
- LOG.info(_LI("In reactor registered"))
-
- def consume_in_thread(self):
- @excutils.forever_retry_uncaught_exceptions
- def _consume(sock):
- LOG.info(_LI("Consuming socket"))
- while True:
- self.consume(sock)
-
- for k in self.proxies.keys():
- self.threads.append(
- self.pool.spawn(_consume, k)
- )
-
- def wait(self):
- for t in self.threads:
- t.wait()
-
- def close(self):
- for s in self.sockets:
- s.close()
-
- for t in self.threads:
- t.kill()
-
-
-class ZmqProxy(ZmqBaseReactor):
- """A consumer class implementing a topic-based proxy.
-
- Forwards to IPC sockets.
- """
-
- def __init__(self, conf):
- super(ZmqProxy, self).__init__(conf)
- pathsep = set((os.path.sep or '', os.path.altsep or '', '/', '\\'))
- self.badchars = re.compile(r'[%s]' % re.escape(''.join(pathsep)))
-
- self.topic_proxy = {}
-
- def consume(self, sock):
- ipc_dir = CONF.rpc_zmq_ipc_dir
-
- data = sock.recv(copy=False)
- topic = data[1].bytes
-
- if topic.startswith('fanout~'):
- sock_type = zmq.PUB
- topic = topic.split('.', 1)[0]
- elif topic.startswith('zmq_replies'):
- sock_type = zmq.PUB
- else:
- sock_type = zmq.PUSH
-
- if topic not in self.topic_proxy:
- def publisher(waiter):
- LOG.info(_LI("Creating proxy for topic: %s"), topic)
-
- try:
- # The topic is received over the network,
- # don't trust this input.
- if self.badchars.search(topic) is not None:
- emsg = _("Topic contained dangerous characters.")
- LOG.warn(emsg)
- raise RPCException(emsg)
-
- out_sock = ZmqSocket("ipc://%s/zmq_topic_%s" %
- (ipc_dir, topic),
- sock_type, bind=True)
- except RPCException:
- waiter.send_exception(*sys.exc_info())
- return
-
- self.topic_proxy[topic] = eventlet.queue.LightQueue(
- CONF.rpc_zmq_topic_backlog)
- self.sockets.append(out_sock)
-
- # It takes some time for a pub socket to open,
- # before we can have any faith in doing a send() to it.
- if sock_type == zmq.PUB:
- eventlet.sleep(.5)
-
- waiter.send(True)
-
- while(True):
- data = self.topic_proxy[topic].get()
- out_sock.send(data, copy=False)
-
- wait_sock_creation = eventlet.event.Event()
- eventlet.spawn(publisher, wait_sock_creation)
-
- try:
- wait_sock_creation.wait()
- except RPCException:
- LOG.error(_LE("Topic socket file creation failed."))
- return
-
- try:
- self.topic_proxy[topic].put_nowait(data)
- except eventlet.queue.Full:
- LOG.error(_LE("Local per-topic backlog buffer full for topic "
- "%(topic)s. Dropping message.") % {'topic': topic})
-
- def consume_in_thread(self):
- """Runs the ZmqProxy service."""
- ipc_dir = CONF.rpc_zmq_ipc_dir
- consume_in = "tcp://%s:%s" % \
- (CONF.rpc_zmq_bind_address,
- CONF.rpc_zmq_port)
- consumption_proxy = InternalContext(None)
-
- try:
- os.makedirs(ipc_dir)
- except os.error:
- if not os.path.isdir(ipc_dir):
- with excutils.save_and_reraise_exception():
- LOG.error(_LE("Required IPC directory does not exist at"
- " %s") % (ipc_dir, ))
- try:
- self.register(consumption_proxy,
- consume_in,
- zmq.PULL)
- except zmq.ZMQError:
- if os.access(ipc_dir, os.X_OK):
- with excutils.save_and_reraise_exception():
- LOG.error(_LE("Permission denied to IPC directory at"
- " %s") % (ipc_dir, ))
- with excutils.save_and_reraise_exception():
- LOG.error(_LE("Could not create ZeroMQ receiver daemon. "
- "Socket may already be in use."))
-
- super(ZmqProxy, self).consume_in_thread()
-
-
-def unflatten_envelope(packenv):
- """Unflattens the RPC envelope.
-
- Takes a list and returns a dictionary.
- i.e. [1,2,3,4] => {1: 2, 3: 4}
- """
- i = iter(packenv)
- h = {}
- try:
- while True:
- k = six.next(i)
- h[k] = six.next(i)
- except StopIteration:
- return h
-
-
-class ZmqReactor(ZmqBaseReactor):
- """A consumer class implementing a consumer for messages.
-
- Can also be used as a 1:1 proxy
- """
-
- def __init__(self, conf):
- super(ZmqReactor, self).__init__(conf)
-
- def consume(self, sock):
- #TODO(ewindisch): use zero-copy (i.e. references, not copying)
- data = sock.recv()
- LOG.debug("CONSUMER RECEIVED DATA: %s", data)
-
- proxy = self.proxies[sock]
-
- if data[2] == 'cast': # Legacy protocol
- packenv = data[3]
-
- ctx, msg = _deserialize(packenv)
- request = rpc_common.deserialize_msg(msg)
- ctx = RpcContext.unmarshal(ctx)
- elif data[2] == 'impl_zmq_v2':
- packenv = data[4:]
-
- msg = unflatten_envelope(packenv)
- request = rpc_common.deserialize_msg(msg)
-
- # Unmarshal only after verifying the message.
- ctx = RpcContext.unmarshal(data[3])
- else:
- LOG.error(_LE("ZMQ Envelope version unsupported or unknown."))
- return
-
- self.pool.spawn_n(self.process, proxy, ctx, request)
-
-
-class Connection(rpc_common.Connection):
- """Manages connections and threads."""
-
- def __init__(self, conf):
- self.topics = []
- self.reactor = ZmqReactor(conf)
-
- def create_consumer(self, topic, proxy, fanout=False):
- # Register with matchmaker.
- _get_matchmaker().register(topic, CONF.rpc_zmq_host)
-
- # Subscription scenarios
- if fanout:
- sock_type = zmq.SUB
- subscribe = ('', fanout)[type(fanout) == str]
- topic = 'fanout~' + topic.split('.', 1)[0]
- else:
- sock_type = zmq.PULL
- subscribe = None
- topic = '.'.join((topic.split('.', 1)[0], CONF.rpc_zmq_host))
-
- if topic in self.topics:
- LOG.info(_LI("Skipping topic registration. Already registered."))
- return
-
- # Receive messages from (local) proxy
- inaddr = "ipc://%s/zmq_topic_%s" % \
- (CONF.rpc_zmq_ipc_dir, topic)
-
- LOG.debug("Consumer is a zmq.%s",
- ['PULL', 'SUB'][sock_type == zmq.SUB])
-
- self.reactor.register(proxy, inaddr, sock_type,
- subscribe=subscribe, in_bind=False)
- self.topics.append(topic)
-
- def close(self):
- _get_matchmaker().stop_heartbeat()
- for topic in self.topics:
- _get_matchmaker().unregister(topic, CONF.rpc_zmq_host)
-
- self.reactor.close()
- self.topics = []
-
- def wait(self):
- self.reactor.wait()
-
- def consume_in_thread(self):
- _get_matchmaker().start_heartbeat()
- self.reactor.consume_in_thread()
-
-
-def _cast(addr, context, topic, msg, timeout=None, envelope=False,
- _msg_id=None):
- timeout_cast = timeout or CONF.rpc_cast_timeout
- payload = [RpcContext.marshal(context), msg]
-
- with Timeout(timeout_cast, exception=rpc_common.Timeout):
- try:
- conn = ZmqClient(addr)
-
- # assumes cast can't return an exception
- conn.cast(_msg_id, topic, payload, envelope)
- except zmq.ZMQError:
- raise RPCException("Cast failed. ZMQ Socket Exception")
- finally:
- if 'conn' in vars():
- conn.close()
-
-
-def _call(addr, context, topic, msg, timeout=None,
- envelope=False):
- # timeout_response is how long we wait for a response
- timeout = timeout or CONF.rpc_response_timeout
-
- # The msg_id is used to track replies.
- msg_id = uuid.uuid4().hex
-
- # Replies always come into the reply service.
- reply_topic = "zmq_replies.%s" % CONF.rpc_zmq_host
-
- LOG.debug("Creating payload")
- # Curry the original request into a reply method.
- mcontext = RpcContext.marshal(context)
- payload = {
- 'method': '-reply',
- 'args': {
- 'msg_id': msg_id,
- 'topic': reply_topic,
- # TODO(ewindisch): safe to remove mcontext in I.
- 'msg': [mcontext, msg]
- }
- }
-
- LOG.debug("Creating queue socket for reply waiter")
-
- # Messages arriving async.
- # TODO(ewindisch): have reply consumer with dynamic subscription mgmt
- with Timeout(timeout, exception=rpc_common.Timeout):
- try:
- msg_waiter = ZmqSocket(
- "ipc://%s/zmq_topic_zmq_replies.%s" %
- (CONF.rpc_zmq_ipc_dir,
- CONF.rpc_zmq_host),
- zmq.SUB, subscribe=msg_id, bind=False
- )
-
- LOG.debug("Sending cast")
- _cast(addr, context, topic, payload, envelope)
-
- LOG.debug("Cast sent; Waiting reply")
- # Blocks until receives reply
- msg = msg_waiter.recv()
- LOG.debug("Received message: %s", msg)
- LOG.debug("Unpacking response")
-
- if msg[2] == 'cast': # Legacy version
- raw_msg = _deserialize(msg[-1])[-1]
- elif msg[2] == 'impl_zmq_v2':
- rpc_envelope = unflatten_envelope(msg[4:])
- raw_msg = rpc_common.deserialize_msg(rpc_envelope)
- else:
- raise rpc_common.UnsupportedRpcEnvelopeVersion(
- _("Unsupported or unknown ZMQ envelope returned."))
-
- responses = raw_msg['args']['response']
- # ZMQError trumps the Timeout error.
- except zmq.ZMQError:
- raise RPCException("ZMQ Socket Error")
- except (IndexError, KeyError):
- raise RPCException(_("RPC Message Invalid."))
- finally:
- if 'msg_waiter' in vars():
- msg_waiter.close()
-
- # It seems we don't need to do all of the following,
- # but perhaps it would be useful for multicall?
- # One effect of this is that we're checking all
- # responses for Exceptions.
- for resp in responses:
- if isinstance(resp, types.DictType) and 'exc' in resp:
- raise rpc_common.deserialize_remote_exception(CONF, resp['exc'])
-
- return responses[-1]
-
-
-def _multi_send(method, context, topic, msg, timeout=None,
- envelope=False, _msg_id=None):
- """Wraps the sending of messages.
-
- Dispatches to the matchmaker and sends message to all relevant hosts.
- """
- conf = CONF
- LOG.debug("%(msg)s" % {'msg': ' '.join(map(pformat, (topic, msg)))})
-
- queues = _get_matchmaker().queues(topic)
- LOG.debug("Sending message(s) to: %s", queues)
-
- # Don't stack if we have no matchmaker results
- if not queues:
- LOG.warn(_("No matchmaker results. Not casting."))
- # While not strictly a timeout, callers know how to handle
- # this exception and a timeout isn't too big a lie.
- raise rpc_common.Timeout(_("No match from matchmaker."))
-
- # This supports brokerless fanout (addresses > 1)
- for queue in queues:
- (_topic, ip_addr) = queue
- _addr = "tcp://%s:%s" % (ip_addr, conf.rpc_zmq_port)
-
- if method.__name__ == '_cast':
- eventlet.spawn_n(method, _addr, context,
- _topic, msg, timeout, envelope,
- _msg_id)
- return
- return method(_addr, context, _topic, msg, timeout,
- envelope)
-
-
-def create_connection(conf, new=True):
- return Connection(conf)
-
-
-def multicall(conf, *args, **kwargs):
- """Multiple calls."""
- return _multi_send(_call, *args, **kwargs)
-
-
-def call(conf, *args, **kwargs):
- """Send a message, expect a response."""
- data = _multi_send(_call, *args, **kwargs)
- return data[-1]
-
-
-def cast(conf, *args, **kwargs):
- """Send a message expecting no reply."""
- _multi_send(_cast, *args, **kwargs)
-
-
-def fanout_cast(conf, context, topic, msg, **kwargs):
- """Send a message to all listening and expect no reply."""
- # NOTE(ewindisch): fanout~ is used because it avoid splitting on .
- # and acts as a non-subtle hint to the matchmaker and ZmqProxy.
- _multi_send(_cast, context, 'fanout~' + str(topic), msg, **kwargs)
-
-
-def notify(conf, context, topic, msg, envelope):
- """Send notification event.
-
- Notifications are sent to topic-priority.
- This differs from the AMQP drivers which send to topic.priority.
- """
- # NOTE(ewindisch): dot-priority in rpc notifier does not
- # work with our assumptions.
- topic = topic.replace('.', '-')
- cast(conf, context, topic, msg, envelope=envelope)
-
-
-def cleanup():
- """Clean up resources in use by implementation."""
- global ZMQ_CTX
- if ZMQ_CTX:
- ZMQ_CTX.term()
- ZMQ_CTX = None
-
- global matchmaker
- matchmaker = None
-
-
-def _get_ctxt():
- if not zmq:
- raise ImportError("Failed to import eventlet.green.zmq")
-
- global ZMQ_CTX
- if not ZMQ_CTX:
- ZMQ_CTX = zmq.Context(CONF.rpc_zmq_contexts)
- return ZMQ_CTX
-
-
-def _get_matchmaker(*args, **kwargs):
- global matchmaker
- if not matchmaker:
- mm = CONF.rpc_zmq_matchmaker
- if mm.endswith('matchmaker.MatchMakerRing'):
- mm.replace('matchmaker', 'matchmaker_ring')
- LOG.warn(_('rpc_zmq_matchmaker = %(orig)s is deprecated; use'
- ' %(new)s instead') % dict(
- orig=CONF.rpc_zmq_matchmaker, new=mm))
- matchmaker = importutils.import_object(mm, *args, **kwargs)
- return matchmaker
diff --git a/ceilometer/openstack/common/rpc/matchmaker.py b/ceilometer/openstack/common/rpc/matchmaker.py
deleted file mode 100644
index a06338da..00000000
--- a/ceilometer/openstack/common/rpc/matchmaker.py
+++ /dev/null
@@ -1,323 +0,0 @@
-# Copyright 2011 Cloudscaling Group, Inc
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-"""
-The MatchMaker classes should except a Topic or Fanout exchange key and
-return keys for direct exchanges, per (approximate) AMQP parlance.
-"""
-
-import contextlib
-
-import eventlet
-from oslo.config import cfg
-
-from ceilometer.openstack.common.gettextutils import _, _LI
-from ceilometer.openstack.common import log as logging
-
-
-matchmaker_opts = [
- cfg.IntOpt('matchmaker_heartbeat_freq',
- default=300,
- help='Heartbeat frequency'),
- cfg.IntOpt('matchmaker_heartbeat_ttl',
- default=600,
- help='Heartbeat time-to-live.'),
-]
-
-CONF = cfg.CONF
-CONF.register_opts(matchmaker_opts)
-LOG = logging.getLogger(__name__)
-contextmanager = contextlib.contextmanager
-
-
-class MatchMakerException(Exception):
- """Signified a match could not be found."""
- message = _("Match not found by MatchMaker.")
-
-
-class Exchange(object):
- """Implements lookups.
-
- Subclass this to support hashtables, dns, etc.
- """
- def __init__(self):
- pass
-
- def run(self, key):
- raise NotImplementedError()
-
-
-class Binding(object):
- """A binding on which to perform a lookup."""
- def __init__(self):
- pass
-
- def test(self, key):
- raise NotImplementedError()
-
-
-class MatchMakerBase(object):
- """Match Maker Base Class.
-
- Build off HeartbeatMatchMakerBase if building a heartbeat-capable
- MatchMaker.
- """
- def __init__(self):
- # Array of tuples. Index [2] toggles negation, [3] is last-if-true
- self.bindings = []
-
- self.no_heartbeat_msg = _('Matchmaker does not implement '
- 'registration or heartbeat.')
-
- def register(self, key, host):
- """Register a host on a backend.
-
- Heartbeats, if applicable, may keepalive registration.
- """
- pass
-
- def ack_alive(self, key, host):
- """Acknowledge that a key.host is alive.
-
- Used internally for updating heartbeats, but may also be used
- publicly to acknowledge a system is alive (i.e. rpc message
- successfully sent to host)
- """
- pass
-
- def is_alive(self, topic, host):
- """Checks if a host is alive."""
- pass
-
- def expire(self, topic, host):
- """Explicitly expire a host's registration."""
- pass
-
- def send_heartbeats(self):
- """Send all heartbeats.
-
- Use start_heartbeat to spawn a heartbeat greenthread,
- which loops this method.
- """
- pass
-
- def unregister(self, key, host):
- """Unregister a topic."""
- pass
-
- def start_heartbeat(self):
- """Spawn heartbeat greenthread."""
- pass
-
- def stop_heartbeat(self):
- """Destroys the heartbeat greenthread."""
- pass
-
- def add_binding(self, binding, rule, last=True):
- self.bindings.append((binding, rule, False, last))
-
- #NOTE(ewindisch): kept the following method in case we implement the
- # underlying support.
- #def add_negate_binding(self, binding, rule, last=True):
- # self.bindings.append((binding, rule, True, last))
-
- def queues(self, key):
- workers = []
-
- # bit is for negate bindings - if we choose to implement it.
- # last stops processing rules if this matches.
- for (binding, exchange, bit, last) in self.bindings:
- if binding.test(key):
- workers.extend(exchange.run(key))
-
- # Support last.
- if last:
- return workers
- return workers
-
-
-class HeartbeatMatchMakerBase(MatchMakerBase):
- """Base for a heart-beat capable MatchMaker.
-
- Provides common methods for registering, unregistering, and maintaining
- heartbeats.
- """
- def __init__(self):
- self.hosts = set()
- self._heart = None
- self.host_topic = {}
-
- super(HeartbeatMatchMakerBase, self).__init__()
-
- def send_heartbeats(self):
- """Send all heartbeats.
-
- Use start_heartbeat to spawn a heartbeat greenthread,
- which loops this method.
- """
- for key, host in self.host_topic:
- self.ack_alive(key, host)
-
- def ack_alive(self, key, host):
- """Acknowledge that a host.topic is alive.
-
- Used internally for updating heartbeats, but may also be used
- publicly to acknowledge a system is alive (i.e. rpc message
- successfully sent to host)
- """
- raise NotImplementedError("Must implement ack_alive")
-
- def backend_register(self, key, host):
- """Implements registration logic.
-
- Called by register(self,key,host)
- """
- raise NotImplementedError("Must implement backend_register")
-
- def backend_unregister(self, key, key_host):
- """Implements de-registration logic.
-
- Called by unregister(self,key,host)
- """
- raise NotImplementedError("Must implement backend_unregister")
-
- def register(self, key, host):
- """Register a host on a backend.
-
- Heartbeats, if applicable, may keepalive registration.
- """
- self.hosts.add(host)
- self.host_topic[(key, host)] = host
- key_host = '.'.join((key, host))
-
- self.backend_register(key, key_host)
-
- self.ack_alive(key, host)
-
- def unregister(self, key, host):
- """Unregister a topic."""
- if (key, host) in self.host_topic:
- del self.host_topic[(key, host)]
-
- self.hosts.discard(host)
- self.backend_unregister(key, '.'.join((key, host)))
-
- LOG.info(_LI("Matchmaker unregistered: %(key)s, %(host)s"),
- {'key': key, 'host': host})
-
- def start_heartbeat(self):
- """Implementation of MatchMakerBase.start_heartbeat.
-
- Launches greenthread looping send_heartbeats(),
- yielding for CONF.matchmaker_heartbeat_freq seconds
- between iterations.
- """
- if not self.hosts:
- raise MatchMakerException(
- _("Register before starting heartbeat."))
-
- def do_heartbeat():
- while True:
- self.send_heartbeats()
- eventlet.sleep(CONF.matchmaker_heartbeat_freq)
-
- self._heart = eventlet.spawn(do_heartbeat)
-
- def stop_heartbeat(self):
- """Destroys the heartbeat greenthread."""
- if self._heart:
- self._heart.kill()
-
-
-class DirectBinding(Binding):
- """Specifies a host in the key via a '.' character.
-
- Although dots are used in the key, the behavior here is
- that it maps directly to a host, thus direct.
- """
- def test(self, key):
- return '.' in key
-
-
-class TopicBinding(Binding):
- """Where a 'bare' key without dots.
-
- AMQP generally considers topic exchanges to be those *with* dots,
- but we deviate here in terminology as the behavior here matches
- that of a topic exchange (whereas where there are dots, behavior
- matches that of a direct exchange.
- """
- def test(self, key):
- return '.' not in key
-
-
-class FanoutBinding(Binding):
- """Match on fanout keys, where key starts with 'fanout.' string."""
- def test(self, key):
- return key.startswith('fanout~')
-
-
-class StubExchange(Exchange):
- """Exchange that does nothing."""
- def run(self, key):
- return [(key, None)]
-
-
-class LocalhostExchange(Exchange):
- """Exchange where all direct topics are local."""
- def __init__(self, host='localhost'):
- self.host = host
- super(Exchange, self).__init__()
-
- def run(self, key):
- return [('.'.join((key.split('.')[0], self.host)), self.host)]
-
-
-class DirectExchange(Exchange):
- """Exchange where all topic keys are split, sending to second half.
-
- i.e. "compute.host" sends a message to "compute.host" running on "host"
- """
- def __init__(self):
- super(Exchange, self).__init__()
-
- def run(self, key):
- e = key.split('.', 1)[1]
- return [(key, e)]
-
-
-class MatchMakerLocalhost(MatchMakerBase):
- """Match Maker where all bare topics resolve to localhost.
-
- Useful for testing.
- """
- def __init__(self, host='localhost'):
- super(MatchMakerLocalhost, self).__init__()
- self.add_binding(FanoutBinding(), LocalhostExchange(host))
- self.add_binding(DirectBinding(), DirectExchange())
- self.add_binding(TopicBinding(), LocalhostExchange(host))
-
-
-class MatchMakerStub(MatchMakerBase):
- """Match Maker where topics are untouched.
-
- Useful for testing, or for AMQP/brokered queues.
- Will not work where knowledge of hosts is known (i.e. zeromq)
- """
- def __init__(self):
- super(MatchMakerStub, self).__init__()
-
- self.add_binding(FanoutBinding(), StubExchange())
- self.add_binding(DirectBinding(), StubExchange())
- self.add_binding(TopicBinding(), StubExchange())
diff --git a/ceilometer/openstack/common/rpc/matchmaker_redis.py b/ceilometer/openstack/common/rpc/matchmaker_redis.py
deleted file mode 100644
index decf9fed..00000000
--- a/ceilometer/openstack/common/rpc/matchmaker_redis.py
+++ /dev/null
@@ -1,144 +0,0 @@
-# Copyright 2013 Cloudscaling Group, Inc
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-"""
-The MatchMaker classes should accept a Topic or Fanout exchange key and
-return keys for direct exchanges, per (approximate) AMQP parlance.
-"""
-
-from oslo.config import cfg
-
-from ceilometer.openstack.common import importutils
-from ceilometer.openstack.common import log as logging
-from ceilometer.openstack.common.rpc import matchmaker as mm_common
-
-redis = importutils.try_import('redis')
-
-
-matchmaker_redis_opts = [
- cfg.StrOpt('host',
- default='127.0.0.1',
- help='Host to locate redis'),
- cfg.IntOpt('port',
- default=6379,
- help='Use this port to connect to redis host.'),
- cfg.StrOpt('password',
- default=None,
- help='Password for Redis server. (optional)'),
-]
-
-CONF = cfg.CONF
-opt_group = cfg.OptGroup(name='matchmaker_redis',
- title='Options for Redis-based MatchMaker')
-CONF.register_group(opt_group)
-CONF.register_opts(matchmaker_redis_opts, opt_group)
-LOG = logging.getLogger(__name__)
-
-
-class RedisExchange(mm_common.Exchange):
- def __init__(self, matchmaker):
- self.matchmaker = matchmaker
- self.redis = matchmaker.redis
- super(RedisExchange, self).__init__()
-
-
-class RedisTopicExchange(RedisExchange):
- """Exchange where all topic keys are split, sending to second half.
-
- i.e. "compute.host" sends a message to "compute" running on "host"
- """
- def run(self, topic):
- while True:
- member_name = self.redis.srandmember(topic)
-
- if not member_name:
- # If this happens, there are no
- # longer any members.
- break
-
- if not self.matchmaker.is_alive(topic, member_name):
- continue
-
- host = member_name.split('.', 1)[1]
- return [(member_name, host)]
- return []
-
-
-class RedisFanoutExchange(RedisExchange):
- """Return a list of all hosts."""
- def run(self, topic):
- topic = topic.split('~', 1)[1]
- hosts = self.redis.smembers(topic)
- good_hosts = filter(
- lambda host: self.matchmaker.is_alive(topic, host), hosts)
-
- return [(x, x.split('.', 1)[1]) for x in good_hosts]
-
-
-class MatchMakerRedis(mm_common.HeartbeatMatchMakerBase):
- """MatchMaker registering and looking-up hosts with a Redis server."""
- def __init__(self):
- super(MatchMakerRedis, self).__init__()
-
- if not redis:
- raise ImportError("Failed to import module redis.")
-
- self.redis = redis.Redis(
- host=CONF.matchmaker_redis.host,
- port=CONF.matchmaker_redis.port,
- password=CONF.matchmaker_redis.password)
-
- self.add_binding(mm_common.FanoutBinding(), RedisFanoutExchange(self))
- self.add_binding(mm_common.DirectBinding(), mm_common.DirectExchange())
- self.add_binding(mm_common.TopicBinding(), RedisTopicExchange(self))
-
- def ack_alive(self, key, host):
- topic = "%s.%s" % (key, host)
- if not self.redis.expire(topic, CONF.matchmaker_heartbeat_ttl):
- # If we could not update the expiration, the key
- # might have been pruned. Re-register, creating a new
- # key in Redis.
- self.register(self.topic_host[host], host)
-
- def is_alive(self, topic, host):
- if self.redis.ttl(host) == -1:
- self.expire(topic, host)
- return False
- return True
-
- def expire(self, topic, host):
- with self.redis.pipeline() as pipe:
- pipe.multi()
- pipe.delete(host)
- pipe.srem(topic, host)
- pipe.execute()
-
- def backend_register(self, key, key_host):
- with self.redis.pipeline() as pipe:
- pipe.multi()
- pipe.sadd(key, key_host)
-
- # No value is needed, we just
- # care if it exists. Sets aren't viable
- # because only keys can expire.
- pipe.set(key_host, '')
-
- pipe.execute()
-
- def backend_unregister(self, key, key_host):
- with self.redis.pipeline() as pipe:
- pipe.multi()
- pipe.srem(key, key_host)
- pipe.delete(key_host)
- pipe.execute()
diff --git a/ceilometer/openstack/common/rpc/matchmaker_ring.py b/ceilometer/openstack/common/rpc/matchmaker_ring.py
deleted file mode 100644
index 9d78ff1f..00000000
--- a/ceilometer/openstack/common/rpc/matchmaker_ring.py
+++ /dev/null
@@ -1,106 +0,0 @@
-# Copyright 2011-2013 Cloudscaling Group, Inc
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-"""
-The MatchMaker classes should except a Topic or Fanout exchange key and
-return keys for direct exchanges, per (approximate) AMQP parlance.
-"""
-
-import itertools
-import json
-
-from oslo.config import cfg
-
-from ceilometer.openstack.common.gettextutils import _LW
-from ceilometer.openstack.common import log as logging
-from ceilometer.openstack.common.rpc import matchmaker as mm
-
-
-matchmaker_opts = [
- # Matchmaker ring file
- cfg.StrOpt('ringfile',
- deprecated_name='matchmaker_ringfile',
- deprecated_group='DEFAULT',
- default='/etc/oslo/matchmaker_ring.json',
- help='Matchmaker ring file (JSON)'),
-]
-
-CONF = cfg.CONF
-CONF.register_opts(matchmaker_opts, 'matchmaker_ring')
-LOG = logging.getLogger(__name__)
-
-
-class RingExchange(mm.Exchange):
- """Match Maker where hosts are loaded from a static JSON formatted file.
-
- __init__ takes optional ring dictionary argument, otherwise
- loads the ringfile from CONF.mathcmaker_ringfile.
- """
- def __init__(self, ring=None):
- super(RingExchange, self).__init__()
-
- if ring:
- self.ring = ring
- else:
- with open(CONF.matchmaker_ring.ringfile, 'r') as fh:
- self.ring = json.load(fh)
-
- self.ring0 = {}
- for k in self.ring.keys():
- self.ring0[k] = itertools.cycle(self.ring[k])
-
- def _ring_has(self, key):
- return key in self.ring0
-
-
-class RoundRobinRingExchange(RingExchange):
- """A Topic Exchange based on a hashmap."""
- def __init__(self, ring=None):
- super(RoundRobinRingExchange, self).__init__(ring)
-
- def run(self, key):
- if not self._ring_has(key):
- LOG.warn(
- _LW("No key defining hosts for topic '%s', "
- "see ringfile") % (key, )
- )
- return []
- host = next(self.ring0[key])
- return [(key + '.' + host, host)]
-
-
-class FanoutRingExchange(RingExchange):
- """Fanout Exchange based on a hashmap."""
- def __init__(self, ring=None):
- super(FanoutRingExchange, self).__init__(ring)
-
- def run(self, key):
- # Assume starts with "fanout~", strip it for lookup.
- nkey = key.split('fanout~')[1:][0]
- if not self._ring_has(nkey):
- LOG.warn(
- _LW("No key defining hosts for topic '%s', "
- "see ringfile") % (nkey, )
- )
- return []
- return map(lambda x: (key + '.' + x, x), self.ring[nkey])
-
-
-class MatchMakerRing(mm.MatchMakerBase):
- """Match Maker where hosts are loaded from a static hashmap."""
- def __init__(self, ring=None):
- super(MatchMakerRing, self).__init__()
- self.add_binding(mm.FanoutBinding(), FanoutRingExchange(ring))
- self.add_binding(mm.DirectBinding(), mm.DirectExchange())
- self.add_binding(mm.TopicBinding(), RoundRobinRingExchange(ring))
diff --git a/ceilometer/openstack/common/rpc/proxy.py b/ceilometer/openstack/common/rpc/proxy.py
deleted file mode 100644
index de9bf572..00000000
--- a/ceilometer/openstack/common/rpc/proxy.py
+++ /dev/null
@@ -1,225 +0,0 @@
-# Copyright 2012-2013 Red Hat, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-"""
-A helper class for proxy objects to remote APIs.
-
-For more information about rpc API version numbers, see:
- rpc/dispatcher.py
-"""
-
-import six
-
-from ceilometer.openstack.common import rpc
-from ceilometer.openstack.common.rpc import common as rpc_common
-from ceilometer.openstack.common.rpc import serializer as rpc_serializer
-
-
-class RpcProxy(object):
- """A helper class for rpc clients.
-
- This class is a wrapper around the RPC client API. It allows you to
- specify the topic and API version in a single place. This is intended to
- be used as a base class for a class that implements the client side of an
- rpc API.
- """
-
- # The default namespace, which can be overridden in a subclass.
- RPC_API_NAMESPACE = None
-
- def __init__(self, topic, default_version, version_cap=None,
- serializer=None):
- """Initialize an RpcProxy.
-
- :param topic: The topic to use for all messages.
- :param default_version: The default API version to request in all
- outgoing messages. This can be overridden on a per-message
- basis.
- :param version_cap: Optionally cap the maximum version used for sent
- messages.
- :param serializer: Optionaly (de-)serialize entities with a
- provided helper.
- """
- self.topic = topic
- self.default_version = default_version
- self.version_cap = version_cap
- if serializer is None:
- serializer = rpc_serializer.NoOpSerializer()
- self.serializer = serializer
- super(RpcProxy, self).__init__()
-
- def _set_version(self, msg, vers):
- """Helper method to set the version in a message.
-
- :param msg: The message having a version added to it.
- :param vers: The version number to add to the message.
- """
- v = vers if vers else self.default_version
- if (self.version_cap and not
- rpc_common.version_is_compatible(self.version_cap, v)):
- raise rpc_common.RpcVersionCapError(version_cap=self.version_cap)
- msg['version'] = v
-
- def _get_topic(self, topic):
- """Return the topic to use for a message."""
- return topic if topic else self.topic
-
- def can_send_version(self, version):
- """Check to see if a version is compatible with the version cap."""
- return (not self.version_cap or
- rpc_common.version_is_compatible(self.version_cap, version))
-
- @staticmethod
- def make_namespaced_msg(method, namespace, **kwargs):
- return {'method': method, 'namespace': namespace, 'args': kwargs}
-
- def make_msg(self, method, **kwargs):
- return self.make_namespaced_msg(method, self.RPC_API_NAMESPACE,
- **kwargs)
-
- def _serialize_msg_args(self, context, kwargs):
- """Helper method called to serialize message arguments.
-
- This calls our serializer on each argument, returning a new
- set of args that have been serialized.
-
- :param context: The request context
- :param kwargs: The arguments to serialize
- :returns: A new set of serialized arguments
- """
- new_kwargs = dict()
- for argname, arg in six.iteritems(kwargs):
- new_kwargs[argname] = self.serializer.serialize_entity(context,
- arg)
- return new_kwargs
-
- def call(self, context, msg, topic=None, version=None, timeout=None):
- """rpc.call() a remote method.
-
- :param context: The request context
- :param msg: The message to send, including the method and args.
- :param topic: Override the topic for this message.
- :param version: (Optional) Override the requested API version in this
- message.
- :param timeout: (Optional) A timeout to use when waiting for the
- response. If no timeout is specified, a default timeout will be
- used that is usually sufficient.
-
- :returns: The return value from the remote method.
- """
- self._set_version(msg, version)
- msg['args'] = self._serialize_msg_args(context, msg['args'])
- real_topic = self._get_topic(topic)
- try:
- result = rpc.call(context, real_topic, msg, timeout)
- return self.serializer.deserialize_entity(context, result)
- except rpc.common.Timeout as exc:
- raise rpc.common.Timeout(
- exc.info, real_topic, msg.get('method'))
-
- def multicall(self, context, msg, topic=None, version=None, timeout=None):
- """rpc.multicall() a remote method.
-
- :param context: The request context
- :param msg: The message to send, including the method and args.
- :param topic: Override the topic for this message.
- :param version: (Optional) Override the requested API version in this
- message.
- :param timeout: (Optional) A timeout to use when waiting for the
- response. If no timeout is specified, a default timeout will be
- used that is usually sufficient.
-
- :returns: An iterator that lets you process each of the returned values
- from the remote method as they arrive.
- """
- self._set_version(msg, version)
- msg['args'] = self._serialize_msg_args(context, msg['args'])
- real_topic = self._get_topic(topic)
- try:
- result = rpc.multicall(context, real_topic, msg, timeout)
- return self.serializer.deserialize_entity(context, result)
- except rpc.common.Timeout as exc:
- raise rpc.common.Timeout(
- exc.info, real_topic, msg.get('method'))
-
- def cast(self, context, msg, topic=None, version=None):
- """rpc.cast() a remote method.
-
- :param context: The request context
- :param msg: The message to send, including the method and args.
- :param topic: Override the topic for this message.
- :param version: (Optional) Override the requested API version in this
- message.
-
- :returns: None. rpc.cast() does not wait on any return value from the
- remote method.
- """
- self._set_version(msg, version)
- msg['args'] = self._serialize_msg_args(context, msg['args'])
- rpc.cast(context, self._get_topic(topic), msg)
-
- def fanout_cast(self, context, msg, topic=None, version=None):
- """rpc.fanout_cast() a remote method.
-
- :param context: The request context
- :param msg: The message to send, including the method and args.
- :param topic: Override the topic for this message.
- :param version: (Optional) Override the requested API version in this
- message.
-
- :returns: None. rpc.fanout_cast() does not wait on any return value
- from the remote method.
- """
- self._set_version(msg, version)
- msg['args'] = self._serialize_msg_args(context, msg['args'])
- rpc.fanout_cast(context, self._get_topic(topic), msg)
-
- def cast_to_server(self, context, server_params, msg, topic=None,
- version=None):
- """rpc.cast_to_server() a remote method.
-
- :param context: The request context
- :param server_params: Server parameters. See rpc.cast_to_server() for
- details.
- :param msg: The message to send, including the method and args.
- :param topic: Override the topic for this message.
- :param version: (Optional) Override the requested API version in this
- message.
-
- :returns: None. rpc.cast_to_server() does not wait on any
- return values.
- """
- self._set_version(msg, version)
- msg['args'] = self._serialize_msg_args(context, msg['args'])
- rpc.cast_to_server(context, server_params, self._get_topic(topic), msg)
-
- def fanout_cast_to_server(self, context, server_params, msg, topic=None,
- version=None):
- """rpc.fanout_cast_to_server() a remote method.
-
- :param context: The request context
- :param server_params: Server parameters. See rpc.cast_to_server() for
- details.
- :param msg: The message to send, including the method and args.
- :param topic: Override the topic for this message.
- :param version: (Optional) Override the requested API version in this
- message.
-
- :returns: None. rpc.fanout_cast_to_server() does not wait on any
- return values.
- """
- self._set_version(msg, version)
- msg['args'] = self._serialize_msg_args(context, msg['args'])
- rpc.fanout_cast_to_server(context, server_params,
- self._get_topic(topic), msg)
diff --git a/ceilometer/openstack/common/rpc/serializer.py b/ceilometer/openstack/common/rpc/serializer.py
deleted file mode 100644
index 9bc6e2a3..00000000
--- a/ceilometer/openstack/common/rpc/serializer.py
+++ /dev/null
@@ -1,54 +0,0 @@
-# Copyright 2013 IBM Corp.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-"""Provides the definition of an RPC serialization handler"""
-
-import abc
-
-import six
-
-
-@six.add_metaclass(abc.ABCMeta)
-class Serializer(object):
- """Generic (de-)serialization definition base class."""
-
- @abc.abstractmethod
- def serialize_entity(self, context, entity):
- """Serialize something to primitive form.
-
- :param context: Security context
- :param entity: Entity to be serialized
- :returns: Serialized form of entity
- """
- pass
-
- @abc.abstractmethod
- def deserialize_entity(self, context, entity):
- """Deserialize something from primitive form.
-
- :param context: Security context
- :param entity: Primitive to be deserialized
- :returns: Deserialized form of entity
- """
- pass
-
-
-class NoOpSerializer(Serializer):
- """A serializer that does nothing."""
-
- def serialize_entity(self, context, entity):
- return entity
-
- def deserialize_entity(self, context, entity):
- return entity
diff --git a/ceilometer/openstack/common/rpc/service.py b/ceilometer/openstack/common/rpc/service.py
deleted file mode 100644
index 8e22a7bc..00000000
--- a/ceilometer/openstack/common/rpc/service.py
+++ /dev/null
@@ -1,75 +0,0 @@
-# Copyright 2010 United States Government as represented by the
-# Administrator of the National Aeronautics and Space Administration.
-# All Rights Reserved.
-# Copyright 2011 Red Hat, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from ceilometer.openstack.common import log as logging
-from ceilometer.openstack.common import rpc
-from ceilometer.openstack.common.rpc import dispatcher as rpc_dispatcher
-from ceilometer.openstack.common import service
-
-
-LOG = logging.getLogger(__name__)
-
-
-class Service(service.Service):
- """Service object for binaries running on hosts.
-
- A service enables rpc by listening to queues based on topic and host.
- """
- def __init__(self, host, topic, manager=None, serializer=None):
- super(Service, self).__init__()
- self.host = host
- self.topic = topic
- self.serializer = serializer
- if manager is None:
- self.manager = self
- else:
- self.manager = manager
-
- def start(self):
- super(Service, self).start()
-
- self.conn = rpc.create_connection(new=True)
- LOG.debug("Creating Consumer connection for Service %s" %
- self.topic)
-
- dispatcher = rpc_dispatcher.RpcDispatcher([self.manager],
- self.serializer)
-
- # Share this same connection for these Consumers
- self.conn.create_consumer(self.topic, dispatcher, fanout=False)
-
- node_topic = '%s.%s' % (self.topic, self.host)
- self.conn.create_consumer(node_topic, dispatcher, fanout=False)
-
- self.conn.create_consumer(self.topic, dispatcher, fanout=True)
-
- # Hook to allow the manager to do other initializations after
- # the rpc connection is created.
- if callable(getattr(self.manager, 'initialize_service_hook', None)):
- self.manager.initialize_service_hook(self)
-
- # Consume from all consumers in a thread
- self.conn.consume_in_thread()
-
- def stop(self):
- # Try to shut the connection down, but if we get any sort of
- # errors, go ahead and ignore them.. as we're shutting down anyway
- try:
- self.conn.close()
- except Exception:
- pass
- super(Service, self).stop()
diff --git a/ceilometer/openstack/common/rpc/zmq_receiver.py b/ceilometer/openstack/common/rpc/zmq_receiver.py
deleted file mode 100644
index 654b52ab..00000000
--- a/ceilometer/openstack/common/rpc/zmq_receiver.py
+++ /dev/null
@@ -1,38 +0,0 @@
-# Copyright 2011 OpenStack Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import eventlet
-eventlet.monkey_patch()
-
-import contextlib
-import sys
-
-from oslo.config import cfg
-
-from ceilometer.openstack.common import log as logging
-from ceilometer.openstack.common import rpc
-from ceilometer.openstack.common.rpc import impl_zmq
-
-CONF = cfg.CONF
-CONF.register_opts(rpc.rpc_opts)
-CONF.register_opts(impl_zmq.zmq_opts)
-
-
-def main():
- CONF(sys.argv[1:], project='oslo')
- logging.setup("oslo")
-
- with contextlib.closing(impl_zmq.ZmqProxy(CONF)) as reactor:
- reactor.consume_in_thread()
- reactor.wait()
diff --git a/ceilometer/orchestration/notifications.py b/ceilometer/orchestration/notifications.py
index d9abb752..83100e0b 100644
--- a/ceilometer/orchestration/notifications.py
+++ b/ceilometer/orchestration/notifications.py
@@ -16,6 +16,7 @@
"""
from oslo.config import cfg
+import oslo.messaging
from ceilometer import plugin
from ceilometer import sample
@@ -46,13 +47,13 @@ class StackCRUD(plugin.NotificationBase):
]
@staticmethod
- def get_exchange_topics(conf):
- return [
- plugin.ExchangeTopics(
- exchange=conf.heat_control_exchange,
- topics=set(topic + ".info"
- for topic in conf.notification_topics)),
- ]
+ def get_targets(conf):
+ """Return a sequence of oslo.messaging.Target defining the exchange and
+ topics to be connected for this plugin.
+ """
+ return [oslo.messaging.Target(topic=topic,
+ exchange=conf.heat_control_exchange)
+ for topic in conf.notification_topics]
def process_notification(self, message):
name = message['event_type'] \
diff --git a/ceilometer/plugin.py b/ceilometer/plugin.py
index d553fef9..53d6de61 100644
--- a/ceilometer/plugin.py
+++ b/ceilometer/plugin.py
@@ -22,13 +22,13 @@ import abc
import collections
import fnmatch
-from oslo.config import cfg
+import oslo.messaging
import six
-# Import this option so every Notification plugin can use it freely.
-cfg.CONF.import_opt('notification_topics',
- 'ceilometer.openstack.common.notifier.rpc_notifier')
+from ceilometer.openstack.common.gettextutils import _ # noqa
+from ceilometer.openstack.common import log
+LOG = log.getLogger(__name__)
ExchangeTopics = collections.namedtuple('ExchangeTopics',
['exchange', 'topics'])
@@ -49,14 +49,25 @@ class NotificationBase(PluginBase):
given to this plugin.
"""
- @abc.abstractmethod
- def get_exchange_topics(self, conf):
- """Return a sequence of ExchangeTopics defining the exchange and
+ def get_targets(self, conf):
+ """Return a sequence of oslo.messaging.Target defining the exchange and
topics to be connected for this plugin.
:param conf: Configuration.
"""
+ #TODO(sileht): Backwards compatibility, remove in J+1
+ if hasattr(self, 'get_exchange_topics'):
+ LOG.warn(_('get_exchange_topics API of NotificationPlugin is'
+ 'deprecated, implements get_targets instead.'))
+
+ targets = []
+ for exchange, topics in self.get_exchange_topics(conf):
+ targets.extend([oslo.messaging.Target(topic=topic,
+ exchange=exchange)
+ for topic in topics])
+ return targets
+
@abc.abstractmethod
def process_notification(self, message):
"""Return a sequence of Counter instances for the given message.
diff --git a/ceilometer/publisher/rpc.py b/ceilometer/publisher/rpc.py
index f6338eab..8f1a602f 100644
--- a/ceilometer/publisher/rpc.py
+++ b/ceilometer/publisher/rpc.py
@@ -24,10 +24,12 @@ import operator
import six.moves.urllib.parse as urlparse
from oslo.config import cfg
+import oslo.messaging
+import oslo.messaging._drivers.common
+from ceilometer import messaging
from ceilometer.openstack.common.gettextutils import _ # noqa
from ceilometer.openstack.common import log
-from ceilometer.openstack.common import rpc
from ceilometer import publisher
from ceilometer.publisher import utils
@@ -52,21 +54,13 @@ def register_opts(config):
register_opts(cfg.CONF)
-def import_backend_retry_config():
- """Import the retry config option native to the configured
- rpc backend (if such a native config option exists).
- """
- cfg.CONF.import_opt('rpc_backend',
- 'ceilometer.openstack.common.rpc')
- kombu = 'ceilometer.openstack.common.rpc.impl_kombu'
- if cfg.CONF.rpc_backend == kombu:
- try:
- cfg.CONF.import_opt('rabbit_max_retries', kombu)
- except ImportError:
- pass
-
-
-import_backend_retry_config()
+def oslo_messaging_is_rabbit():
+ kombu = ['ceilometer.openstack.common.rpc.impl_kombu',
+ 'oslo.messaging._drivers.impl_rabbit:RabbitDriver'
+ 'rabbit']
+ return cfg.CONF.rpc_backend in kombu or (
+ cfg.CONF.transport_url and
+ cfg.CONF.transport_url.startswith('rabbit://'))
def override_backend_retry_config(value):
@@ -75,11 +69,11 @@ def override_backend_retry_config(value):
:param value: the value to override
"""
- # TODO(eglynn): ultimately we should add to olso a more generic concept
+ # TODO(sileht): ultimately we should add to olso a more generic concept
# of retry config (i.e. not specific to an individual AMQP provider)
# see: https://bugs.launchpad.net/ceilometer/+bug/1244698
- kombu = 'ceilometer.openstack.common.rpc.impl_kombu'
- if cfg.CONF.rpc_backend == kombu:
+ # and: https://bugs.launchpad.net/oslo.messaging/+bug/1282639
+ if oslo_messaging_is_rabbit():
if 'rabbit_max_retries' in cfg.CONF:
cfg.CONF.set_override('rabbit_max_retries', value)
@@ -106,7 +100,6 @@ class RPCPublisher(publisher.PublisherBase):
LOG.info(_('Publishing policy set to %s, '
'override backend retry config to 1') % self.policy)
override_backend_retry_config(1)
-
elif self.policy == 'default':
LOG.info(_('Publishing policy set to %s') % self.policy)
else:
@@ -114,6 +107,8 @@ class RPCPublisher(publisher.PublisherBase):
% self.policy)
self.policy = 'default'
+ self.rpc_client = messaging.get_rpc_client(version='1.0')
+
def publish_samples(self, context, samples):
"""Publish samples on RPC.
@@ -130,28 +125,19 @@ class RPCPublisher(publisher.PublisherBase):
]
topic = cfg.CONF.publisher_rpc.metering_topic
- msg = {
- 'method': self.target,
- 'version': '1.0',
- 'args': {'data': meters},
- }
LOG.audit(_('Publishing %(m)d samples on %(t)s') % (
- {'m': len(msg['args']['data']), 't': topic}))
- self.local_queue.append((context, topic, msg))
+ {'m': len(meters), 't': topic}))
+ self.local_queue.append((context, topic, meters))
if self.per_meter_topic:
for meter_name, meter_list in itertools.groupby(
sorted(meters, key=operator.itemgetter('counter_name')),
operator.itemgetter('counter_name')):
- msg = {
- 'method': self.target,
- 'version': '1.0',
- 'args': {'data': list(meter_list)},
- }
+ meter_list = list(meter_list)
topic_name = topic + '.' + meter_name
LOG.audit(_('Publishing %(m)d samples on %(n)s') % (
- {'m': len(msg['args']['data']), 'n': topic_name}))
- self.local_queue.append((context, topic_name, msg))
+ {'m': len(meter_list), 'n': topic_name}))
+ self.local_queue.append((context, topic_name, meter_list))
self.flush()
@@ -177,8 +163,7 @@ class RPCPublisher(publisher.PublisherBase):
LOG.warn(_("Publisher max local_queue length is exceeded, "
"dropping %d oldest samples") % count)
- @staticmethod
- def _process_queue(queue, policy):
+ def _process_queue(self, queue, policy):
#note(sileht):
# the behavior of rpc.cast call depends of rabbit_max_retries
# if rabbit_max_retries <= 0:
@@ -186,19 +171,16 @@ class RPCPublisher(publisher.PublisherBase):
# if rabbit_max_retries > 0:
# it raises a exception if rabbitmq is unreachable
#
- # Ugly, but actually the oslo.rpc do a sys.exit(1) instead of a
- # RPCException, so we catch both until a correct behavior is
- # implemented in oslo
- #
# the default policy just respect the rabbitmq configuration
# nothing special is done if rabbit_max_retries <= 0
# and exception is reraised if rabbit_max_retries > 0
while queue:
- context, topic, msg = queue[0]
+ context, topic, meters = queue[0]
try:
- rpc.cast(context, topic, msg)
- except (SystemExit, rpc.common.RPCException):
- samples = sum([len(m['args']['data']) for n, n, m in queue])
+ self.rpc_client.prepare(topic=topic).cast(
+ context.to_dict(), self.target, data=meters)
+ except oslo.messaging._drivers.common.RPCException:
+ samples = sum([len(m) for __, __, m in queue])
if policy == 'queue':
LOG.warn(_("Failed to publish %d samples, queue them"),
samples)
diff --git a/ceilometer/service.py b/ceilometer/service.py
index 2095068e..fd4964fd 100644
--- a/ceilometer/service.py
+++ b/ceilometer/service.py
@@ -24,10 +24,10 @@ import sys
from oslo.config import cfg
from stevedore import named
+from ceilometer import messaging
from ceilometer.openstack.common import gettextutils
from ceilometer.openstack.common.gettextutils import _ # noqa
from ceilometer.openstack.common import log
-from ceilometer.openstack.common import rpc
from ceilometer import utils
@@ -135,7 +135,6 @@ def get_workers(name):
def prepare_service(argv=None):
gettextutils.install('ceilometer', lazy=True)
gettextutils.enable_lazy()
- rpc.set_defaults(control_exchange='ceilometer')
cfg.set_defaults(log.log_opts,
default_log_levels=['amqplib=WARN',
'qpid.messaging=INFO',
@@ -149,3 +148,4 @@ def prepare_service(argv=None):
argv = sys.argv
cfg.CONF(argv[1:], project='ceilometer')
log.setup('ceilometer')
+ messaging.setup()
diff --git a/ceilometer/tests/alarm/partition/test_coordination.py b/ceilometer/tests/alarm/partition/test_coordination.py
index bdd8e72a..707122ea 100644
--- a/ceilometer/tests/alarm/partition/test_coordination.py
+++ b/ceilometer/tests/alarm/partition/test_coordination.py
@@ -26,6 +26,7 @@ import mock
from six import moves
from ceilometer.alarm.partition import coordination
+from ceilometer import messaging
from ceilometer.openstack.common.fixture import config
from ceilometer.openstack.common import test
from ceilometer.openstack.common import timeutils
@@ -36,6 +37,9 @@ class TestCoordinate(test.BaseTestCase):
def setUp(self):
super(TestCoordinate, self).setUp()
self.CONF = self.useFixture(config.Config()).conf
+ messaging.setup('fake://')
+ self.addCleanup(messaging.cleanup)
+
self.test_interval = 120
self.CONF.set_override('evaluation_interval',
self.test_interval,
diff --git a/ceilometer/tests/alarm/test_notifier.py b/ceilometer/tests/alarm/test_notifier.py
index 583ac6fa..8f282833 100644
--- a/ceilometer/tests/alarm/test_notifier.py
+++ b/ceilometer/tests/alarm/test_notifier.py
@@ -21,6 +21,7 @@ import mock
import requests
from ceilometer.alarm import service
+from ceilometer import messaging
from ceilometer.openstack.common import context
from ceilometer.openstack.common.fixture import config
from ceilometer.openstack.common import test
@@ -41,15 +42,18 @@ class TestAlarmNotifier(test.BaseTestCase):
def setUp(self):
super(TestAlarmNotifier, self).setUp()
+ messaging.setup('fake://')
+ self.addCleanup(messaging.cleanup)
+
self.CONF = self.useFixture(config.Config()).conf
- self.service = service.AlarmNotifierService('somehost', 'sometopic')
+ self.service = service.AlarmNotifierService()
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
def test_init_host(self):
# If we try to create a real RPC connection, init_host() never
# returns. Mock it out so we can establish the service
# configuration.
- with mock.patch('ceilometer.openstack.common.rpc.create_connection'):
+ with mock.patch.object(self.service.rpc_server, 'start'):
self.service.start()
def test_notify_alarm(self):
diff --git a/ceilometer/tests/alarm/test_partitioned_alarm_svc.py b/ceilometer/tests/alarm/test_partitioned_alarm_svc.py
index 18fed237..7b22d700 100644
--- a/ceilometer/tests/alarm/test_partitioned_alarm_svc.py
+++ b/ceilometer/tests/alarm/test_partitioned_alarm_svc.py
@@ -18,11 +18,11 @@
"""Tests for ceilometer.alarm.service.PartitionedAlarmService.
"""
import contextlib
-
import mock
from stevedore import extension
from ceilometer.alarm import service
+from ceilometer import messaging
from ceilometer.openstack.common.fixture import config
from ceilometer.openstack.common import test
@@ -30,6 +30,9 @@ from ceilometer.openstack.common import test
class TestPartitionedAlarmService(test.BaseTestCase):
def setUp(self):
super(TestPartitionedAlarmService, self).setUp()
+ messaging.setup('fake://')
+ self.addCleanup(messaging.cleanup)
+
self.threshold_eval = mock.Mock()
self.api_client = mock.MagicMock()
self.CONF = self.useFixture(config.Config()).conf
@@ -60,10 +63,9 @@ class TestPartitionedAlarmService(test.BaseTestCase):
test_interval,
group='alarm')
get_client = 'ceilometerclient.client.get_client'
- create_conn = 'ceilometer.openstack.common.rpc.create_connection'
- with contextlib.nested(mock.patch(get_client,
- return_value=self.api_client),
- mock.patch(create_conn)):
+ with contextlib.nested(
+ mock.patch(get_client, return_value=self.api_client),
+ mock.patch.object(self.partitioned.rpc_server, 'start')):
self.partitioned.start()
pc = self.partitioned.partition_coordinator
expected = [
diff --git a/ceilometer/tests/alarm/test_rpc.py b/ceilometer/tests/alarm/test_rpc.py
index 792b6b24..d798c4e2 100644
--- a/ceilometer/tests/alarm/test_rpc.py
+++ b/ceilometer/tests/alarm/test_rpc.py
@@ -22,26 +22,27 @@ from ceilometerclient.v2 import alarms
import mock
from ceilometer.alarm import rpc as rpc_alarm
-from ceilometer.openstack.common.fixture import config
+from ceilometer import messaging
from ceilometer.openstack.common.fixture import mockpatch
-from ceilometer.openstack.common import rpc
from ceilometer.openstack.common import test
from ceilometer.openstack.common import timeutils
from ceilometer.storage import models
class TestRPCAlarmNotifier(test.BaseTestCase):
- def fake_cast(self, context, topic, msg):
- self.notified.append((topic, msg))
- self.CONF = self.useFixture(config.Config()).conf
+ def fake_cast(self, context, method, **args):
+ self.notified.append((method, args))
def setUp(self):
super(TestRPCAlarmNotifier, self).setUp()
+ messaging.setup('fake://')
+ self.addCleanup(messaging.cleanup)
+
self.notified = []
+ self.notifier = rpc_alarm.RPCAlarmNotifier()
self.useFixture(mockpatch.PatchObject(
- rpc, 'cast',
+ self.notifier.client, 'cast',
side_effect=self.fake_cast))
- self.notifier = rpc_alarm.RPCAlarmNotifier()
self.alarms = [
alarms.Alarm(None, info={
'name': 'instance_running_hot',
@@ -77,6 +78,10 @@ class TestRPCAlarmNotifier(test.BaseTestCase):
}),
]
+ def test_rpc_target(self):
+ topic = self.notifier.client.target.topic
+ self.assertEqual('alarm_notifier', topic)
+
def test_notify_alarm(self):
previous = ['alarm', 'ok']
for i, a in enumerate(self.alarms):
@@ -85,25 +90,22 @@ class TestRPCAlarmNotifier(test.BaseTestCase):
self.assertEqual(2, len(self.notified))
for i, a in enumerate(self.alarms):
actions = getattr(a, models.Alarm.ALARM_ACTIONS_MAP[a.state])
- self.assertEqual(self.CONF.alarm.notifier_rpc_topic,
- self.notified[i][0])
+ self.assertEqual('notify_alarm', self.notified[i][0])
self.assertEqual(self.alarms[i].alarm_id,
- self.notified[i][1]["args"]["data"]["alarm_id"])
- self.assertEqual(actions,
- self.notified[i][1]["args"]["data"]["actions"])
+ self.notified[i][1]["data"]["alarm_id"])
+ self.assertEqual(actions, self.notified[i][1]["data"]["actions"])
self.assertEqual(previous[i],
- self.notified[i][1]["args"]["data"]["previous"])
+ self.notified[i][1]["data"]["previous"])
self.assertEqual(self.alarms[i].state,
- self.notified[i][1]["args"]["data"]["current"])
+ self.notified[i][1]["data"]["current"])
self.assertEqual("what? %d" % i,
- self.notified[i][1]["args"]["data"]["reason"])
- self.assertEqual(
- {'fire': '%d' % i},
- self.notified[i][1]["args"]["data"]["reason_data"])
+ self.notified[i][1]["data"]["reason"])
+ self.assertEqual({'fire': '%d' % i},
+ self.notified[i][1]["data"]["reason_data"])
def test_notify_non_string_reason(self):
self.notifier.notify(self.alarms[0], 'ok', 42, {})
- reason = self.notified[0][1]['args']['data']['reason']
+ reason = self.notified[0][1]['data']['reason']
self.assertIsInstance(reason, basestring)
def test_notify_no_actions(self):
@@ -128,42 +130,48 @@ class TestRPCAlarmNotifier(test.BaseTestCase):
class TestRPCAlarmPartitionCoordination(test.BaseTestCase):
- def fake_fanout_cast(self, context, topic, msg):
- self.notified.append((topic, msg))
+ def fake_fanout_cast(self, context, method, **args):
+ self.notified.append((method, args))
+
+ def fake_prepare(self, fanout):
+ self.assertTrue(fanout)
+ cctxt = mock.Mock()
+ cctxt.cast.side_effect = self.fake_fanout_cast
+ return cctxt
def setUp(self):
super(TestRPCAlarmPartitionCoordination, self).setUp()
+ messaging.setup('fake://')
+ self.addCleanup(messaging.cleanup)
+
self.notified = []
- self.useFixture(mockpatch.PatchObject(
- rpc, 'fanout_cast',
- side_effect=self.fake_fanout_cast))
self.ordination = rpc_alarm.RPCAlarmPartitionCoordination()
+ self.useFixture(mockpatch.PatchObject(
+ self.ordination.client, 'prepare',
+ side_effect=self.fake_prepare))
self.alarms = [mock.MagicMock(), mock.MagicMock()]
def test_ordination_presence(self):
id = uuid.uuid4()
priority = float(timeutils.utcnow().strftime('%s.%f'))
self.ordination.presence(id, priority)
- topic, msg = self.notified[0]
- self.assertEqual('alarm_partition_coordination', topic)
- self.assertEqual(id, msg['args']['data']['uuid'])
- self.assertEqual(priority, msg['args']['data']['priority'])
- self.assertEqual('presence', msg['method'])
+ method, args = self.notified[0]
+ self.assertEqual(id, args['data']['uuid'])
+ self.assertEqual(priority, args['data']['priority'])
+ self.assertEqual('presence', method)
def test_ordination_assign(self):
id = uuid.uuid4()
self.ordination.assign(id, self.alarms)
- topic, msg = self.notified[0]
- self.assertEqual('alarm_partition_coordination', topic)
- self.assertEqual(id, msg['args']['data']['uuid'])
- self.assertEqual(2, len(msg['args']['data']['alarms']))
- self.assertEqual('assign', msg['method'])
+ method, args = self.notified[0]
+ self.assertEqual(id, args['data']['uuid'])
+ self.assertEqual(2, len(args['data']['alarms']))
+ self.assertEqual('assign', method)
def test_ordination_allocate(self):
id = uuid.uuid4()
self.ordination.allocate(id, self.alarms)
- topic, msg = self.notified[0]
- self.assertEqual('alarm_partition_coordination', topic)
- self.assertEqual(id, msg['args']['data']['uuid'])
- self.assertEqual(2, len(msg['args']['data']['alarms']))
- self.assertEqual('allocate', msg['method'])
+ method, args = self.notified[0]
+ self.assertEqual(id, args['data']['uuid'])
+ self.assertEqual(2, len(args['data']['alarms']))
+ self.assertEqual('allocate', method)
diff --git a/ceilometer/tests/alarm/test_singleton_alarm_svc.py b/ceilometer/tests/alarm/test_singleton_alarm_svc.py
index 2055439c..7e3c1aed 100644
--- a/ceilometer/tests/alarm/test_singleton_alarm_svc.py
+++ b/ceilometer/tests/alarm/test_singleton_alarm_svc.py
@@ -24,12 +24,16 @@ from oslo.config import cfg
from stevedore import extension
from ceilometer.alarm import service
+from ceilometer import messaging
from ceilometer.openstack.common import test
class TestSingletonAlarmService(test.BaseTestCase):
def setUp(self):
super(TestSingletonAlarmService, self).setUp()
+ messaging.setup('fake://')
+ self.addCleanup(messaging.cleanup)
+
self.threshold_eval = mock.Mock()
self.evaluators = extension.ExtensionManager.make_test_instance(
[
diff --git a/ceilometer/tests/api/__init__.py b/ceilometer/tests/api/__init__.py
index eee701a1..8d0f2132 100644
--- a/ceilometer/tests/api/__init__.py
+++ b/ceilometer/tests/api/__init__.py
@@ -26,6 +26,7 @@ from six.moves import urllib
from ceilometer.api import acl
from ceilometer.api.v1 import app as v1_app
from ceilometer.api.v1 import blueprint as v1_blueprint
+from ceilometer import messaging
from ceilometer.openstack.common import jsonutils
from ceilometer import service
from ceilometer.tests import db as db_test_base
@@ -37,6 +38,8 @@ class TestBase(db_test_base.TestBase):
def setUp(self):
super(TestBase, self).setUp()
+ messaging.setup("fake://")
+ self.addCleanup(messaging.cleanup)
service.prepare_service([])
self.CONF.set_override("auth_version",
"v2.0", group=acl.OPT_GROUP_NAME)
@@ -83,6 +86,8 @@ class FunctionalTest(db_test_base.TestBase):
PATH_PREFIX = ''
def setUp(self):
+ messaging.setup("fake://")
+ self.addCleanup(messaging.cleanup)
super(FunctionalTest, self).setUp()
self.CONF.set_override("auth_version", "v2.0",
group=acl.OPT_GROUP_NAME)
diff --git a/ceilometer/tests/api/v1/test_app.py b/ceilometer/tests/api/v1/test_app.py
index fed5b8be..bb3f38c9 100644
--- a/ceilometer/tests/api/v1/test_app.py
+++ b/ceilometer/tests/api/v1/test_app.py
@@ -21,6 +21,7 @@ import os
from ceilometer.api import acl
from ceilometer.api.v1 import app
+from ceilometer import messaging
from ceilometer.openstack.common import fileutils
from ceilometer.openstack.common.fixture import config
from ceilometer.openstack.common import test
@@ -32,6 +33,8 @@ class TestApp(test.BaseTestCase):
def setUp(self):
super(TestApp, self).setUp()
self.CONF = self.useFixture(config.Config()).conf
+ messaging.setup('fake://')
+ self.addCleanup(messaging.cleanup)
def test_keystone_middleware_conf(self):
self.CONF.set_override("auth_protocol", "foottp",
diff --git a/ceilometer/tests/api/v2/test_alarm_scenarios.py b/ceilometer/tests/api/v2/test_alarm_scenarios.py
index 915ac364..4e26d291 100644
--- a/ceilometer/tests/api/v2/test_alarm_scenarios.py
+++ b/ceilometer/tests/api/v2/test_alarm_scenarios.py
@@ -29,6 +29,7 @@ import mock
from six import moves
+from ceilometer import messaging
from ceilometer.storage import models
from ceilometer.tests.api.v2 import FunctionalTest
from ceilometer.tests import db as tests_db
@@ -1740,17 +1741,17 @@ class TestAlarms(FunctionalTest,
}
}
- with mock.patch('ceilometer.openstack.common.notifier.api.notify') \
- as notifier:
+ with mock.patch.object(messaging, 'get_notifier') as get_notifier:
+ notifier = get_notifier.return_value
+
self.post_json('/alarms', params=json, headers=self.auth_headers)
+ get_notifier.assert_called_once_with(publisher_id='ceilometer.api')
- calls = notifier.call_args_list
+ calls = notifier.info.call_args_list
self.assertEqual(1, len(calls))
args, _ = calls[0]
- context, publisher, event_type, priority, payload = args
- self.assertTrue(publisher.startswith('ceilometer.api'))
+ context, event_type, payload = args
self.assertEqual('alarm.creation', event_type)
- self.assertEqual('INFO', priority)
self.assertEqual('sent_notification', payload['detail']['name'])
self.assertTrue(set(['alarm_id', 'detail', 'event_id', 'on_behalf_of',
'project_id', 'timestamp', 'type',
@@ -1759,18 +1760,18 @@ class TestAlarms(FunctionalTest,
def test_alarm_sends_notification(self):
# Hit the AlarmController (with alarm_id supplied) ...
data = self.get_json('/alarms')
- with mock.patch('ceilometer.openstack.common.notifier.api.notify') \
- as notifier:
+ with mock.patch.object(messaging, 'get_notifier') as get_notifier:
+ notifier = get_notifier.return_value
+
self.delete('/alarms/%s' % data[0]['alarm_id'],
headers=self.auth_headers, status=204)
+ get_notifier.assert_called_once_with(publisher_id='ceilometer.api')
- calls = notifier.call_args_list
+ calls = notifier.info.call_args_list
self.assertEqual(1, len(calls))
args, _ = calls[0]
- context, publisher, event_type, priority, payload = args
- self.assertTrue(publisher.startswith('ceilometer.api'))
+ context, event_type, payload = args
self.assertEqual('alarm.deletion', event_type)
- self.assertEqual('INFO', priority)
self.assertEqual('name1', payload['detail']['name'])
self.assertTrue(set(['alarm_id', 'detail', 'event_id', 'on_behalf_of',
'project_id', 'timestamp', 'type',
diff --git a/ceilometer/tests/api/v2/test_app.py b/ceilometer/tests/api/v2/test_app.py
index 9b4bda27..d58b20db 100644
--- a/ceilometer/tests/api/v2/test_app.py
+++ b/ceilometer/tests/api/v2/test_app.py
@@ -57,6 +57,7 @@ class TestApp(base.BaseTestCase):
def test_keystone_middleware_parse_conffile(self):
pipeline_conf = self.path_get("etc/ceilometer/pipeline.yaml")
content = "[DEFAULT]\n"\
+ "rpc_backend = fake\n"\
"pipeline_cfg_file = {0}\n"\
"[{1}]\n"\
"auth_protocol = barttp\n"\
diff --git a/ceilometer/tests/api/v2/test_post_samples_scenarios.py b/ceilometer/tests/api/v2/test_post_samples_scenarios.py
index 1236be12..925fdfcd 100644
--- a/ceilometer/tests/api/v2/test_post_samples_scenarios.py
+++ b/ceilometer/tests/api/v2/test_post_samples_scenarios.py
@@ -21,8 +21,9 @@
import copy
import datetime
-from ceilometer.openstack.common.fixture.mockpatch import PatchObject
-from ceilometer.openstack.common import rpc
+import mock
+
+from ceilometer.openstack.common.fixture import mockpatch
from ceilometer.openstack.common import timeutils
from ceilometer.tests.api.v2 import FunctionalTest
from ceilometer.tests import db as tests_db
@@ -30,16 +31,30 @@ from ceilometer.tests import db as tests_db
class TestPostSamples(FunctionalTest,
tests_db.MixinTestsWithBackendScenarios):
-
- def fake_cast(self, context, topic, msg):
- for s in msg['args']['data']:
- del s['message_signature']
- self.published.append((topic, msg))
+ def fake_cast(self, ctxt, target, data):
+ for m in data:
+ del m['message_signature']
+ self.published.append(data)
+
+ def patch_publishers(self):
+ cast_ctxt = mock.Mock()
+ cast_ctxt.cast.side_effect = self.fake_cast
+ found = False
+ pipeline_hook = self.app.app.application.app.hooks[2]
+ for pipeline in pipeline_hook.pipeline_manager.pipelines:
+ for publisher in pipeline.publishers:
+ if hasattr(publisher, 'rpc_client'):
+ self.useFixture(mockpatch.PatchObject(
+ publisher.rpc_client, 'prepare',
+ return_value=cast_ctxt))
+ found = True
+ if not found:
+ raise Exception('fail to patch the rpc publisher')
def setUp(self):
super(TestPostSamples, self).setUp()
self.published = []
- self.useFixture(PatchObject(rpc, 'cast', side_effect=self.fake_cast))
+ self.patch_publishers()
def test_one(self):
s1 = [{'counter_name': 'apples',
@@ -51,7 +66,6 @@ class TestPostSamples(FunctionalTest,
'user_id': 'efd87807-12d2-4b38-9c70-5f5c2ac427ff',
'resource_metadata': {'name1': 'value1',
'name2': 'value2'}}]
-
data = self.post_json('/meters/apples/', s1)
# timestamp not given so it is generated.
@@ -62,7 +76,7 @@ class TestPostSamples(FunctionalTest,
s1[0]['source'] = '%s:openstack' % s1[0]['project_id']
self.assertEqual(s1, data.json)
- self.assertEqual(s1[0], self.published[0][1]['args']['data'][0])
+ self.assertEqual(s1[0], self.published[0][0])
def test_nested_metadata(self):
s1 = [{'counter_name': 'apples',
@@ -92,7 +106,7 @@ class TestPostSamples(FunctionalTest,
# only the published sample should be unwound, not the representation
# in the API response
self.assertEqual(s1[0], data.json[0])
- self.assertEqual(unwound, self.published[0][1]['args']['data'][0])
+ self.assertEqual(unwound, self.published[0][0])
def test_invalid_counter_type(self):
s1 = [{'counter_name': 'my_counter_name',
@@ -194,12 +208,12 @@ class TestPostSamples(FunctionalTest,
c['timestamp'] = timestamp.replace(tzinfo=None).isoformat()
# do the same on the pipeline
- msg = self.published[0][1]['args']['data'][x]
+ msg = self.published[0][x]
timestamp = timeutils.parse_isotime(msg['timestamp'])
msg['timestamp'] = timestamp.replace(tzinfo=None).isoformat()
self.assertEqual(s, c)
- self.assertEqual(s, self.published[0][1]['args']['data'][x])
+ self.assertEqual(s, self.published[0][x])
def test_missing_mandatory_fields(self):
"""Do not accept posting samples with missing mandatory fields."""
@@ -259,7 +273,7 @@ class TestPostSamples(FunctionalTest,
s['timestamp'] = data.json[x]['timestamp']
s.setdefault('resource_metadata', dict())
self.assertEqual(s, data.json[x])
- self.assertEqual(s, self.published[0][1]['args']['data'][x])
+ self.assertEqual(s, self.published[0][x])
def test_multiple_samples_multiple_sources(self):
"""Do accept a single post with some multiples sources
@@ -307,7 +321,7 @@ class TestPostSamples(FunctionalTest,
s['timestamp'] = data.json[x]['timestamp']
s.setdefault('resource_metadata', dict())
self.assertEqual(s, data.json[x])
- self.assertEqual(s, self.published[0][1]['args']['data'][x])
+ self.assertEqual(s, self.published[0][x])
def test_missing_project_user_id(self):
"""Ensure missing project & user IDs are defaulted appropriately.
@@ -344,4 +358,4 @@ class TestPostSamples(FunctionalTest,
s['project_id'] = project_id
self.assertEqual(s, data.json[x])
- self.assertEqual(s, self.published[0][1]['args']['data'][x])
+ self.assertEqual(s, self.published[0][x])
diff --git a/ceilometer/tests/objectstore/test_swift_middleware.py b/ceilometer/tests/objectstore/test_swift_middleware.py
index 638b8f51..d3ac36b2 100644
--- a/ceilometer/tests/objectstore/test_swift_middleware.py
+++ b/ceilometer/tests/objectstore/test_swift_middleware.py
@@ -28,6 +28,7 @@ except ImportError:
import webob
REQUEST = webob
+from ceilometer import messaging
from ceilometer.objectstore import swift_middleware
from ceilometer.openstack.common.fixture import config
from ceilometer.openstack.common.fixture.mockpatch import PatchObject
@@ -74,6 +75,8 @@ class TestSwiftMiddleware(test.BaseTestCase):
self.pipeline_manager = self._faux_pipeline_manager()
self.useFixture(PatchObject(pipeline, 'setup_pipeline',
side_effect=self._fake_setup_pipeline))
+ messaging.setup('fake://')
+ self.addCleanup(messaging.cleanup)
self.CONF = self.useFixture(config.Config()).conf
@staticmethod
diff --git a/ceilometer/tests/publisher/test_rpc_publisher.py b/ceilometer/tests/publisher/test_rpc_publisher.py
index d52a397f..f45e9f14 100644
--- a/ceilometer/tests/publisher/test_rpc_publisher.py
+++ b/ceilometer/tests/publisher/test_rpc_publisher.py
@@ -21,9 +21,11 @@
import datetime
import eventlet
-import fixtures
import mock
+import oslo.messaging
+import oslo.messaging._drivers.common
+from ceilometer import messaging
from ceilometer.openstack.common.fixture import config
from ceilometer.openstack.common import network_utils
from ceilometer.openstack.common import test
@@ -90,214 +92,249 @@ class TestPublish(test.BaseTestCase):
),
]
- def faux_cast(self, context, topic, msg):
- if self.rpc_unreachable:
- #note(sileht): Ugly, but when rabbitmq is unreachable
- # and rabbitmq_max_retries is not 0
- # oslo.rpc do a sys.exit(1), so we do the same
- # things here until this is fixed in oslo
- raise SystemExit(1)
- else:
- self.published.append((topic, msg))
-
def setUp(self):
super(TestPublish, self).setUp()
self.CONF = self.useFixture(config.Config()).conf
+
+ messaging.setup('fake://')
+ self.addCleanup(messaging.cleanup)
+
self.published = []
- self.rpc_unreachable = False
- self.useFixture(fixtures.MonkeyPatch(
- "ceilometer.openstack.common.rpc.cast",
- self.faux_cast))
def test_published(self):
publisher = rpc.RPCPublisher(
network_utils.urlsplit('rpc://'))
- publisher.publish_samples(None,
- self.test_data)
- self.assertEqual(1, len(self.published))
- self.assertEqual(self.CONF.publisher_rpc.metering_topic,
- self.published[0][0])
- self.assertIsInstance(self.published[0][1]['args']['data'], list)
- self.assertEqual('record_metering_data',
- self.published[0][1]['method'])
+ cast_context = mock.MagicMock()
+ with mock.patch.object(publisher.rpc_client, 'prepare') as prepare:
+ prepare.return_value = cast_context
+ publisher.publish_samples(mock.MagicMock(),
+ self.test_data)
+
+ prepare.assert_called_once_with(
+ topic=self.CONF.publisher_rpc.metering_topic)
+ cast_context.cast.assert_called_once_with(
+ mock.ANY, 'record_metering_data', data=mock.ANY)
def test_publish_target(self):
publisher = rpc.RPCPublisher(
network_utils.urlsplit('rpc://?target=custom_procedure_call'))
- publisher.publish_samples(None,
- self.test_data)
- self.assertEqual(1, len(self.published))
- self.assertEqual(self.CONF.publisher_rpc.metering_topic,
- self.published[0][0])
- self.assertIsInstance(self.published[0][1]['args']['data'], list)
- self.assertEqual('custom_procedure_call',
- self.published[0][1]['method'])
+ cast_context = mock.MagicMock()
+ with mock.patch.object(publisher.rpc_client, 'prepare') as prepare:
+ prepare.return_value = cast_context
+ publisher.publish_samples(mock.MagicMock(),
+ self.test_data)
+
+ prepare.assert_called_once_with(
+ topic=self.CONF.publisher_rpc.metering_topic)
+ cast_context.cast.assert_called_once_with(
+ mock.ANY, 'custom_procedure_call', data=mock.ANY)
def test_published_with_per_meter_topic(self):
publisher = rpc.RPCPublisher(
network_utils.urlsplit('rpc://?per_meter_topic=1'))
- publisher.publish_samples(None,
- self.test_data)
- self.assertEqual(4, len(self.published))
- for topic, rpc_call in self.published:
- meters = rpc_call['args']['data']
- self.assertIsInstance(meters, list)
- if topic != self.CONF.publisher_rpc.metering_topic:
- self.assertEqual(1, len(set(meter['counter_name']
- for meter in meters)),
- "Meter are published grouped by name")
-
- topics = [topic for topic, meter in self.published]
- self.assertIn(self.CONF.publisher_rpc.metering_topic, topics)
- self.assertIn(
- self.CONF.publisher_rpc.metering_topic + '.' + 'test', topics)
- self.assertIn(
- self.CONF.publisher_rpc.metering_topic + '.' + 'test2', topics)
- self.assertIn(
- self.CONF.publisher_rpc.metering_topic + '.' + 'test3', topics)
+ with mock.patch.object(publisher.rpc_client, 'prepare') as prepare:
+ publisher.publish_samples(mock.MagicMock(),
+ self.test_data)
+
+ class MeterGroupMatcher(object):
+ def __eq__(self, meters):
+ return len(set(meter['counter_name']
+ for meter in meters)) == 1
+
+ topic = self.CONF.publisher_rpc.metering_topic
+ expected = [mock.call(topic=topic),
+ mock.call().cast(mock.ANY, 'record_metering_data',
+ data=mock.ANY),
+ mock.call(topic=topic + '.test'),
+ mock.call().cast(mock.ANY, 'record_metering_data',
+ data=MeterGroupMatcher()),
+ mock.call(topic=topic + '.test2'),
+ mock.call().cast(mock.ANY, 'record_metering_data',
+ data=MeterGroupMatcher()),
+ mock.call(topic=topic + '.test3'),
+ mock.call().cast(mock.ANY, 'record_metering_data',
+ data=MeterGroupMatcher())]
+ self.assertEqual(expected, prepare.mock_calls)
def test_published_concurrency(self):
"""This test the concurrent access to the local queue
of the rpc publisher
"""
- def faux_cast_go(context, topic, msg):
- self.published.append((topic, msg))
+ publisher = rpc.RPCPublisher(network_utils.urlsplit('rpc://'))
+ cast_context = mock.MagicMock()
- def faux_cast_wait(context, topic, msg):
- self.useFixture(fixtures.MonkeyPatch(
- "ceilometer.openstack.common.rpc.cast",
- faux_cast_go))
- # Sleep to simulate concurrency and allow other threads to work
- eventlet.sleep(0)
- self.published.append((topic, msg))
+ with mock.patch.object(publisher.rpc_client, 'prepare') as prepare:
+ def fake_prepare_go(topic):
+ return cast_context
- self.useFixture(fixtures.MonkeyPatch(
- "ceilometer.openstack.common.rpc.cast",
- faux_cast_wait))
+ def fake_prepare_wait(topic):
+ prepare.side_effect = fake_prepare_go
+ # Sleep to simulate concurrency and allow other threads to work
+ eventlet.sleep(0)
+ return cast_context
- publisher = rpc.RPCPublisher(network_utils.urlsplit('rpc://'))
- job1 = eventlet.spawn(publisher.publish_samples, None, self.test_data)
- job2 = eventlet.spawn(publisher.publish_samples, None, self.test_data)
+ prepare.side_effect = fake_prepare_wait
+
+ job1 = eventlet.spawn(publisher.publish_samples,
+ mock.MagicMock(), self.test_data)
+ job2 = eventlet.spawn(publisher.publish_samples,
+ mock.MagicMock(), self.test_data)
- job1.wait()
- job2.wait()
+ job1.wait()
+ job2.wait()
self.assertEqual('default', publisher.policy)
- self.assertEqual(2, len(self.published))
+ self.assertEqual(2, len(cast_context.cast.mock_calls))
self.assertEqual(0, len(publisher.local_queue))
@mock.patch('ceilometer.publisher.rpc.LOG')
def test_published_with_no_policy(self, mylog):
- self.rpc_unreachable = True
publisher = rpc.RPCPublisher(
network_utils.urlsplit('rpc://'))
- self.assertTrue(mylog.info.called)
+ side_effect = oslo.messaging._drivers.common.RPCException()
+ with mock.patch.object(publisher.rpc_client, 'prepare') as prepare:
+ prepare.side_effect = side_effect
- self.assertRaises(
- SystemExit,
- publisher.publish_samples,
- None, self.test_data)
- self.assertEqual('default', publisher.policy)
- self.assertEqual(0, len(self.published))
- self.assertEqual(0, len(publisher.local_queue))
+ self.assertRaises(
+ oslo.messaging._drivers.common.RPCException,
+ publisher.publish_samples,
+ mock.MagicMock(), self.test_data)
+ self.assertTrue(mylog.info.called)
+ self.assertEqual('default', publisher.policy)
+ self.assertEqual(0, len(publisher.local_queue))
+ prepare.assert_called_once_with(
+ topic=self.CONF.publisher_rpc.metering_topic)
@mock.patch('ceilometer.publisher.rpc.LOG')
def test_published_with_policy_block(self, mylog):
- self.rpc_unreachable = True
publisher = rpc.RPCPublisher(
network_utils.urlsplit('rpc://?policy=default'))
- self.assertTrue(mylog.info.called)
- self.assertRaises(
- SystemExit,
- publisher.publish_samples,
- None, self.test_data)
- self.assertEqual(0, len(self.published))
- self.assertEqual(0, len(publisher.local_queue))
+ side_effect = oslo.messaging._drivers.common.RPCException()
+ with mock.patch.object(publisher.rpc_client, 'prepare') as prepare:
+ prepare.side_effect = side_effect
+ self.assertRaises(
+ oslo.messaging._drivers.common.RPCException,
+ publisher.publish_samples,
+ mock.MagicMock(), self.test_data)
+ self.assertTrue(mylog.info.called)
+ self.assertEqual(0, len(publisher.local_queue))
+ prepare.assert_called_once_with(
+ topic=self.CONF.publisher_rpc.metering_topic)
@mock.patch('ceilometer.publisher.rpc.LOG')
def test_published_with_policy_incorrect(self, mylog):
- self.rpc_unreachable = True
publisher = rpc.RPCPublisher(
network_utils.urlsplit('rpc://?policy=notexist'))
- self.assertRaises(
- SystemExit,
- publisher.publish_samples,
- None, self.test_data)
- self.assertTrue(mylog.warn.called)
- self.assertEqual('default', publisher.policy)
- self.assertEqual(0, len(self.published))
- self.assertEqual(0, len(publisher.local_queue))
+ side_effect = oslo.messaging._drivers.common.RPCException()
+ with mock.patch.object(publisher.rpc_client, 'prepare') as prepare:
+ prepare.side_effect = side_effect
+ self.assertRaises(
+ oslo.messaging._drivers.common.RPCException,
+ publisher.publish_samples,
+ mock.MagicMock(), self.test_data)
+ self.assertTrue(mylog.warn.called)
+ self.assertEqual('default', publisher.policy)
+ self.assertEqual(0, len(publisher.local_queue))
+ prepare.assert_called_once_with(
+ topic=self.CONF.publisher_rpc.metering_topic)
def test_published_with_policy_drop_and_rpc_down(self):
- self.rpc_unreachable = True
publisher = rpc.RPCPublisher(
network_utils.urlsplit('rpc://?policy=drop'))
- publisher.publish_samples(None,
- self.test_data)
- self.assertEqual(0, len(self.published))
- self.assertEqual(0, len(publisher.local_queue))
+ side_effect = oslo.messaging._drivers.common.RPCException()
+ with mock.patch.object(publisher.rpc_client, 'prepare') as prepare:
+ prepare.side_effect = side_effect
+ publisher.publish_samples(mock.MagicMock(),
+ self.test_data)
+ self.assertEqual(0, len(publisher.local_queue))
+ prepare.assert_called_once_with(
+ topic=self.CONF.publisher_rpc.metering_topic)
def test_published_with_policy_queue_and_rpc_down(self):
- self.rpc_unreachable = True
publisher = rpc.RPCPublisher(
network_utils.urlsplit('rpc://?policy=queue'))
- publisher.publish_samples(None,
- self.test_data)
- self.assertEqual(0, len(self.published))
- self.assertEqual(1, len(publisher.local_queue))
+ side_effect = oslo.messaging._drivers.common.RPCException()
+ with mock.patch.object(publisher.rpc_client, 'prepare') as prepare:
+ prepare.side_effect = side_effect
+
+ publisher.publish_samples(mock.MagicMock(),
+ self.test_data)
+ self.assertEqual(1, len(publisher.local_queue))
+ prepare.assert_called_once_with(
+ topic=self.CONF.publisher_rpc.metering_topic)
def test_published_with_policy_queue_and_rpc_down_up(self):
self.rpc_unreachable = True
publisher = rpc.RPCPublisher(
network_utils.urlsplit('rpc://?policy=queue'))
- publisher.publish_samples(None,
- self.test_data)
- self.assertEqual(0, len(self.published))
- self.assertEqual(1, len(publisher.local_queue))
- self.rpc_unreachable = False
- publisher.publish_samples(None,
- self.test_data)
+ side_effect = oslo.messaging._drivers.common.RPCException()
+ with mock.patch.object(publisher.rpc_client, 'prepare') as prepare:
+ prepare.side_effect = side_effect
+ publisher.publish_samples(mock.MagicMock(),
+ self.test_data)
- self.assertEqual(2, len(self.published))
- self.assertEqual(0, len(publisher.local_queue))
+ self.assertEqual(1, len(publisher.local_queue))
+
+ prepare.side_effect = mock.MagicMock()
+ publisher.publish_samples(mock.MagicMock(),
+ self.test_data)
+
+ self.assertEqual(0, len(publisher.local_queue))
+
+ topic = self.CONF.publisher_rpc.metering_topic
+ expected = [mock.call(topic=topic),
+ mock.call(topic=topic),
+ mock.call(topic=topic)]
+ self.assertEqual(expected, prepare.mock_calls)
def test_published_with_policy_sized_queue_and_rpc_down(self):
- self.rpc_unreachable = True
publisher = rpc.RPCPublisher(
network_utils.urlsplit('rpc://?policy=queue&max_queue_length=3'))
- for i in range(5):
- for s in self.test_data:
- s.source = 'test-%d' % i
- publisher.publish_samples(None,
- self.test_data)
- self.assertEqual(0, len(self.published))
+
+ side_effect = oslo.messaging._drivers.common.RPCException()
+ with mock.patch.object(publisher.rpc_client, 'prepare') as prepare:
+ prepare.side_effect = side_effect
+ for i in range(0, 5):
+ for s in self.test_data:
+ s.source = 'test-%d' % i
+ publisher.publish_samples(mock.MagicMock(),
+ self.test_data)
+
self.assertEqual(3, len(publisher.local_queue))
- self.assertEqual('test-2',
- publisher.local_queue[0][2]['args']['data'][0]
- ['source'])
- self.assertEqual('test-3',
- publisher.local_queue[1][2]['args']['data'][0]
- ['source'])
- self.assertEqual('test-4',
- publisher.local_queue[2][2]['args']['data'][0]
- ['source'])
+ self.assertEqual(
+ 'test-2',
+ publisher.local_queue[0][2][0]['source']
+ )
+ self.assertEqual(
+ 'test-3',
+ publisher.local_queue[1][2][0]['source']
+ )
+ self.assertEqual(
+ 'test-4',
+ publisher.local_queue[2][2][0]['source']
+ )
def test_published_with_policy_default_sized_queue_and_rpc_down(self):
- self.rpc_unreachable = True
publisher = rpc.RPCPublisher(
network_utils.urlsplit('rpc://?policy=queue'))
- for i in range(2000):
- for s in self.test_data:
- s.source = 'test-%d' % i
- publisher.publish_samples(None,
- self.test_data)
- self.assertEqual(0, len(self.published))
+
+ side_effect = oslo.messaging._drivers.common.RPCException()
+ with mock.patch.object(publisher.rpc_client, 'prepare') as prepare:
+ prepare.side_effect = side_effect
+ for i in range(0, 2000):
+ for s in self.test_data:
+ s.source = 'test-%d' % i
+ publisher.publish_samples(mock.MagicMock(),
+ self.test_data)
+
self.assertEqual(1024, len(publisher.local_queue))
- self.assertEqual('test-976',
- publisher.local_queue[0][2]['args']['data'][0]
- ['source'])
- self.assertEqual('test-1999',
- publisher.local_queue[1023][2]['args']['data'][0]
- ['source'])
+ self.assertEqual(
+ 'test-976',
+ publisher.local_queue[0][2][0]['source']
+ )
+ self.assertEqual(
+ 'test-1999',
+ publisher.local_queue[1023][2][0]['source']
+ )
diff --git a/ceilometer/tests/test_bin.py b/ceilometer/tests/test_bin.py
index 0b9a5563..00b4b7e1 100644
--- a/ceilometer/tests/test_bin.py
+++ b/ceilometer/tests/test_bin.py
@@ -33,7 +33,9 @@ from ceilometer.tests import base
class BinTestCase(base.BaseTestCase):
def setUp(self):
super(BinTestCase, self).setUp()
- content = ("[database]\n"
+ content = ("[DEFAULT]\n"
+ "rpc_backend=fake\n"
+ "[database]\n"
"connection=log://localhost\n")
self.tempfile = fileutils.write_to_tempfile(content=content,
prefix='ceilometer',
@@ -58,7 +60,9 @@ class BinTestCase(base.BaseTestCase):
self.assertIn("Nothing to clean", err)
def test_run_expirer_ttl_enabled(self):
- content = ("[database]\n"
+ content = ("[DEFAULT]\n"
+ "rpc_backend=fake\n"
+ "[database]\n"
"time_to_live=1\n"
"connection=log://localhost\n")
self.tempfile = fileutils.write_to_tempfile(content=content,
@@ -78,7 +82,7 @@ class BinSendSampleTestCase(base.BaseTestCase):
super(BinSendSampleTestCase, self).setUp()
pipeline_cfg_file = self.path_get('etc/ceilometer/pipeline.yaml')
content = "[DEFAULT]\n"\
- "rpc_backend=ceilometer.openstack.common.rpc.impl_fake\n"\
+ "rpc_backend=fake\n"\
"pipeline_cfg_file={0}\n".format(pipeline_cfg_file)
self.tempfile = fileutils.write_to_tempfile(content=content,
@@ -106,7 +110,7 @@ class BinApiTestCase(base.BaseTestCase):
pipeline_cfg_file = self.path_get('etc/ceilometer/pipeline.yaml')
policy_file = self.path_get('etc/ceilometer/policy.json')
content = "[DEFAULT]\n"\
- "rpc_backend=ceilometer.openstack.common.rpc.impl_fake\n"\
+ "rpc_backend=fake\n"\
"auth_strategy=noauth\n"\
"debug=true\n"\
"pipeline_cfg_file={0}\n"\
diff --git a/ceilometer/tests/test_collector.py b/ceilometer/tests/test_collector.py
index 20bfe664..7b5ba798 100644
--- a/ceilometer/tests/test_collector.py
+++ b/ceilometer/tests/test_collector.py
@@ -17,12 +17,14 @@
# under the License.
import socket
+import eventlet
import mock
-from mock import patch
import msgpack
+import oslo.messaging
from stevedore import extension
from ceilometer import collector
+from ceilometer import messaging
from ceilometer.openstack.common.fixture import config
from ceilometer import sample
from ceilometer.tests import base as tests_base
@@ -36,9 +38,11 @@ class FakeConnection():
class TestCollector(tests_base.BaseTestCase):
def setUp(self):
super(TestCollector, self).setUp()
+ messaging.setup('fake://')
+ self.addCleanup(messaging.cleanup)
self.CONF = self.useFixture(config.Config()).conf
self.CONF.set_override("connection", "log://", group='database')
- self.srv = collector.CollectorService('the-host', 'the-topic')
+ self.srv = collector.CollectorService()
self.counter = sample.Sample(
name='foobar',
type='bad',
@@ -81,13 +85,12 @@ class TestCollector(tests_base.BaseTestCase):
def test_record_metering_data(self):
mock_dispatcher = mock.MagicMock()
self.srv.dispatcher_manager = self._make_test_manager(mock_dispatcher)
-
self.srv.record_metering_data(None, self.counter)
-
mock_dispatcher.record_metering_data.assert_called_once_with(
data=self.counter)
def test_udp_receive(self):
+ self.CONF.set_override('rpc_backend', '')
mock_dispatcher = mock.MagicMock()
self.srv.dispatcher_manager = self._make_test_manager(mock_dispatcher)
self.counter['source'] = 'mysource'
@@ -97,7 +100,7 @@ class TestCollector(tests_base.BaseTestCase):
self.counter['counter_unit'] = self.counter['unit']
udp_socket = self._make_fake_socket()
- with patch('socket.socket', return_value=udp_socket):
+ with mock.patch('socket.socket', return_value=udp_socket):
self.srv.start_udp()
self._verify_udp_socket(udp_socket)
@@ -106,6 +109,7 @@ class TestCollector(tests_base.BaseTestCase):
self.counter)
def test_udp_receive_storage_error(self):
+ self.CONF.set_override('rpc_backend', '')
mock_dispatcher = mock.MagicMock()
self.srv.dispatcher_manager = self._make_test_manager(mock_dispatcher)
mock_dispatcher.record_metering_data.side_effect = self._raise_error
@@ -117,7 +121,7 @@ class TestCollector(tests_base.BaseTestCase):
self.counter['counter_unit'] = self.counter['unit']
udp_socket = self._make_fake_socket()
- with patch('socket.socket', return_value=udp_socket):
+ with mock.patch('socket.socket', return_value=udp_socket):
self.srv.start_udp()
self._verify_udp_socket(udp_socket)
@@ -130,42 +134,37 @@ class TestCollector(tests_base.BaseTestCase):
raise Exception
def test_udp_receive_bad_decoding(self):
+ self.CONF.set_override('rpc_backend', '')
udp_socket = self._make_fake_socket()
- with patch('socket.socket', return_value=udp_socket):
- with patch('msgpack.loads', self._raise_error):
+ with mock.patch('socket.socket', return_value=udp_socket):
+ with mock.patch('msgpack.loads', self._raise_error):
self.srv.start_udp()
self._verify_udp_socket(udp_socket)
- @patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
- @patch('ceilometer.event.converter.setup_events', mock.MagicMock())
- def test_init_host(self):
- # If we try to create a real RPC connection, init_host() never
- # returns. Mock it out so we can establish the service
- # configuration.
- with patch('ceilometer.openstack.common.rpc.create_connection'):
- self.srv.start()
-
- def test_only_udp(self):
+ @mock.patch.object(oslo.messaging.MessageHandlingServer, 'start')
+ @mock.patch.object(collector.CollectorService, 'start_udp')
+ def test_only_udp(self, udp_start, rpc_start):
"""Check that only UDP is started if rpc_backend is empty."""
self.CONF.set_override('rpc_backend', '')
udp_socket = self._make_fake_socket()
- with patch('socket.socket', return_value=udp_socket):
+ with mock.patch('socket.socket', return_value=udp_socket):
self.srv.start()
- def test_only_rpc(self):
+ # UDP run into its own thread, so we need to sleep to get
+ # the thread start
+ eventlet.sleep(0)
+ self.assertEqual(0, rpc_start.call_count)
+ self.assertEqual(1, udp_start.call_count)
+
+ @mock.patch.object(oslo.messaging.MessageHandlingServer, 'start')
+ @mock.patch.object(collector.CollectorService, 'start_udp')
+ def test_only_rpc(self, udp_start, rpc_start):
"""Check that only RPC is started if udp_address is empty."""
self.CONF.set_override('udp_address', '', group='collector')
- with patch('ceilometer.openstack.common.rpc.create_connection'):
- self.srv.start()
-
- @patch.object(FakeConnection, 'create_worker')
- @patch('ceilometer.openstack.common.rpc.dispatcher.RpcDispatcher')
- def test_initialize_service_hook_conf_opt(self, mock_dispatcher,
- mock_worker):
- self.CONF.set_override('metering_topic', 'mytopic',
- group='publisher_rpc')
- self.srv.conn = FakeConnection()
- self.srv.initialize_service_hook(mock.MagicMock())
- mock_worker.assert_called_once_with('mytopic', mock_dispatcher(),
- 'ceilometer.collector.mytopic')
+ self.srv.start()
+ # UDP run into its own thread, so we need to sleep to get
+ # the thread start
+ eventlet.sleep(0)
+ self.assertEqual(1, rpc_start.call_count)
+ self.assertEqual(0, udp_start.call_count)
diff --git a/ceilometer/tests/test_middleware.py b/ceilometer/tests/test_middleware.py
index be15ec89..992ab649 100644
--- a/ceilometer/tests/test_middleware.py
+++ b/ceilometer/tests/test_middleware.py
@@ -97,6 +97,6 @@ class TestNotifications(test.BaseTestCase):
['HTTP_X_SERVICE_NAME'], sample.resource_id)
self.assertEqual(1, sample.volume)
- def test_exchanges(self):
- topics = middleware.HTTPRequest().get_exchange_topics(self.CONF)
- self.assertEqual(4, len(topics))
+ def test_targets(self):
+ targets = middleware.HTTPRequest().get_targets(self.CONF)
+ self.assertEqual(4, len(targets))
diff --git a/ceilometer/tests/test_notification.py b/ceilometer/tests/test_notification.py
index 1ac69096..cc369056 100644
--- a/ceilometer/tests/test_notification.py
+++ b/ceilometer/tests/test_notification.py
@@ -19,71 +19,77 @@
import mock
+import oslo.messaging
from stevedore import extension
from ceilometer.compute.notifications import instance
+from ceilometer import messaging
from ceilometer import notification
from ceilometer.openstack.common.fixture import config
from ceilometer.storage import models
from ceilometer.tests import base as tests_base
-TEST_NOTICE = {
- u'_context_auth_token': u'3d8b13de1b7d499587dfc69b77dc09c2',
- u'_context_is_admin': True,
- u'_context_project_id': u'7c150a59fe714e6f9263774af9688f0e',
- u'_context_quota_class': None,
- u'_context_read_deleted': u'no',
- u'_context_remote_address': u'10.0.2.15',
- u'_context_request_id': u'req-d68b36e0-9233-467f-9afb-d81435d64d66',
- u'_context_roles': [u'admin'],
- u'_context_timestamp': u'2012-05-08T20:23:41.425105',
- u'_context_user_id': u'1e3ce043029547f1a61c1996d1a531a2',
- u'event_type': u'compute.instance.create.end',
+TEST_NOTICE_CTXT = {
+ u'auth_token': u'3d8b13de1b7d499587dfc69b77dc09c2',
+ u'is_admin': True,
+ u'project_id': u'7c150a59fe714e6f9263774af9688f0e',
+ u'quota_class': None,
+ u'read_deleted': u'no',
+ u'remote_address': u'10.0.2.15',
+ u'request_id': u'req-d68b36e0-9233-467f-9afb-d81435d64d66',
+ u'roles': [u'admin'],
+ u'timestamp': u'2012-05-08T20:23:41.425105',
+ u'user_id': u'1e3ce043029547f1a61c1996d1a531a2',
+}
+
+TEST_NOTICE_METADATA = {
u'message_id': u'dae6f69c-00e0-41c0-b371-41ec3b7f4451',
- u'payload': {u'created_at': u'2012-05-08 20:23:41',
- u'deleted_at': u'',
- u'disk_gb': 0,
- u'display_name': u'testme',
- u'fixed_ips': [{u'address': u'10.0.0.2',
- u'floating_ips': [],
- u'meta': {},
- u'type': u'fixed',
- u'version': 4}],
- u'image_ref_url': u'http://10.0.2.15:9292/images/UUID',
- u'instance_id': u'9f9d01b9-4a58-4271-9e27-398b21ab20d1',
- u'instance_type': u'm1.tiny',
- u'instance_type_id': 2,
- u'launched_at': u'2012-05-08 20:23:47.985999',
- u'memory_mb': 512,
- u'state': u'active',
- u'state_description': u'',
- u'tenant_id': u'7c150a59fe714e6f9263774af9688f0e',
- u'user_id': u'1e3ce043029547f1a61c1996d1a531a2',
- u'reservation_id': u'1e3ce043029547f1a61c1996d1a531a3',
- u'vcpus': 1,
- u'root_gb': 0,
- u'ephemeral_gb': 0,
- u'host': u'compute-host-name',
- u'availability_zone': u'1e3ce043029547f1a61c1996d1a531a4',
- u'os_type': u'linux?',
- u'architecture': u'x86',
- u'image_ref': u'UUID',
- u'kernel_id': u'1e3ce043029547f1a61c1996d1a531a5',
- u'ramdisk_id': u'1e3ce043029547f1a61c1996d1a531a6',
- },
- u'priority': u'INFO',
- u'publisher_id': u'compute.vagrant-precise',
u'timestamp': u'2012-05-08 20:23:48.028195',
}
+TEST_NOTICE_PAYLOAD = {
+ u'created_at': u'2012-05-08 20:23:41',
+ u'deleted_at': u'',
+ u'disk_gb': 0,
+ u'display_name': u'testme',
+ u'fixed_ips': [{u'address': u'10.0.0.2',
+ u'floating_ips': [],
+ u'meta': {},
+ u'type': u'fixed',
+ u'version': 4}],
+ u'image_ref_url': u'http://10.0.2.15:9292/images/UUID',
+ u'instance_id': u'9f9d01b9-4a58-4271-9e27-398b21ab20d1',
+ u'instance_type': u'm1.tiny',
+ u'instance_type_id': 2,
+ u'launched_at': u'2012-05-08 20:23:47.985999',
+ u'memory_mb': 512,
+ u'state': u'active',
+ u'state_description': u'',
+ u'tenant_id': u'7c150a59fe714e6f9263774af9688f0e',
+ u'user_id': u'1e3ce043029547f1a61c1996d1a531a2',
+ u'reservation_id': u'1e3ce043029547f1a61c1996d1a531a3',
+ u'vcpus': 1,
+ u'root_gb': 0,
+ u'ephemeral_gb': 0,
+ u'host': u'compute-host-name',
+ u'availability_zone': u'1e3ce043029547f1a61c1996d1a531a4',
+ u'os_type': u'linux?',
+ u'architecture': u'x86',
+ u'image_ref': u'UUID',
+ u'kernel_id': u'1e3ce043029547f1a61c1996d1a531a5',
+ u'ramdisk_id': u'1e3ce043029547f1a61c1996d1a531a6',
+}
+
class TestNotification(tests_base.BaseTestCase):
def setUp(self):
super(TestNotification, self).setUp()
- self.srv = notification.NotificationService('the-host', 'the-topic')
self.CONF = self.useFixture(config.Config()).conf
+ messaging.setup('fake://')
+ self.addCleanup(messaging.cleanup)
self.CONF.set_override("connection", "log://", group='database')
+ self.srv = notification.NotificationService()
def _make_test_manager(self, plugin):
return extension.ExtensionManager.make_test_instance(
@@ -97,18 +103,23 @@ class TestNotification(tests_base.BaseTestCase):
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
@mock.patch('ceilometer.event.converter.setup_events', mock.MagicMock())
+ @mock.patch.object(oslo.messaging.MessageHandlingServer, 'start',
+ mock.MagicMock())
def test_process_notification(self):
# If we try to create a real RPC connection, init_host() never
# returns. Mock it out so we can establish the service
# configuration.
self.CONF.set_override("store_events", False, group="notification")
- with mock.patch('ceilometer.openstack.common.rpc.create_connection'):
- self.srv.start()
+ self.srv.start()
self.srv.pipeline_manager.pipelines[0] = mock.MagicMock()
self.srv.notification_manager = self._make_test_manager(
instance.Instance()
)
- self.srv.process_notification(TEST_NOTICE)
+
+ self.srv.info(TEST_NOTICE_CTXT, 'compute.vagrant-precise',
+ 'compute.instance.create.end',
+ TEST_NOTICE_PAYLOAD, TEST_NOTICE_METADATA)
+
self.assertTrue(
self.srv.pipeline_manager.publisher.called)
@@ -142,6 +153,8 @@ class TestNotification(tests_base.BaseTestCase):
def test_message_to_event_bad_event(self):
self.CONF.set_override("store_events", True, group="notification")
+ self.CONF.set_override("ack_on_event_error", False,
+ group="notification")
mock_dispatcher = mock.MagicMock()
self.srv.event_converter = mock.MagicMock()
self.srv.event_converter.to_event.return_value = mock.MagicMock(
@@ -150,5 +163,5 @@ class TestNotification(tests_base.BaseTestCase):
mock_dispatcher.record_events.return_value = [
(models.Event.UNKNOWN_PROBLEM, object())]
message = {'event_type': "foo", 'message_id': "abc"}
- self.assertRaises(notification.UnableToSaveEventException,
- self.srv._message_to_event, message)
+ ret = self.srv._message_to_event(message)
+ self.assertEqual(oslo.messaging.NotificationResult.REQUEUE, ret)
diff --git a/ceilometer/tests/test_plugin.py b/ceilometer/tests/test_plugin.py
index 9a031ed5..4f206444 100644
--- a/ceilometer/tests/test_plugin.py
+++ b/ceilometer/tests/test_plugin.py
@@ -16,6 +16,7 @@
# License for the specific language governing permissions and limitations
# under the License.
+from ceilometer.openstack.common.fixture import config
from ceilometer.openstack.common import test
from ceilometer import plugin
@@ -71,6 +72,10 @@ TEST_NOTIFICATION = {
class NotificationBaseTestCase(test.BaseTestCase):
+ def setUp(self):
+ super(NotificationBaseTestCase, self).setUp()
+ self.CONF = self.useFixture(config.Config()).conf
+
def test_handle_event_type(self):
self.assertFalse(plugin.NotificationBase._handle_event_type(
'compute.instance.start', ['compute']))
@@ -91,7 +96,10 @@ class NotificationBaseTestCase(test.BaseTestCase):
class FakePlugin(plugin.NotificationBase):
def get_exchange_topics(self, conf):
- return
+ return [plugin.ExchangeTopics(exchange="exchange1",
+ topics=["t1", "t2"]),
+ plugin.ExchangeTopics(exchange="exchange2",
+ topics=['t3'])]
def process_notification(self, message):
return message
@@ -107,3 +115,13 @@ class NotificationBaseTestCase(test.BaseTestCase):
n = self.FakeNetworkPlugin()
self.assertTrue(len(list(c.to_samples(TEST_NOTIFICATION))) > 0)
self.assertEqual(0, len(list(n.to_samples(TEST_NOTIFICATION))))
+
+ def test_get_targets_compat(self):
+ targets = self.FakeComputePlugin().get_targets(self.CONF)
+ self.assertEqual(3, len(targets))
+ self.assertEqual('t1', targets[0].topic)
+ self.assertEqual('exchange1', targets[0].exchange)
+ self.assertEqual('t2', targets[1].topic)
+ self.assertEqual('exchange1', targets[1].exchange)
+ self.assertEqual('t3', targets[2].topic)
+ self.assertEqual('exchange2', targets[2].exchange)
diff --git a/ceilometer/volume/notifications.py b/ceilometer/volume/notifications.py
index 9947e195..79bd7063 100644
--- a/ceilometer/volume/notifications.py
+++ b/ceilometer/volume/notifications.py
@@ -20,6 +20,7 @@ events.
"""
from oslo.config import cfg
+import oslo.messaging
from ceilometer import plugin
from ceilometer import sample
@@ -46,16 +47,13 @@ class _Base(plugin.NotificationBase):
]
@staticmethod
- def get_exchange_topics(conf):
- """Return a sequence of ExchangeTopics defining the exchange and
+ def get_targets(conf):
+ """Return a sequence of oslo.messaging.Target defining the exchange and
topics to be connected for this plugin.
"""
- return [
- plugin.ExchangeTopics(
- exchange=conf.cinder_control_exchange,
- topics=set(topic + ".info"
- for topic in conf.notification_topics)),
- ]
+ return [oslo.messaging.Target(topic=topic,
+ exchange=conf.cinder_control_exchange)
+ for topic in conf.notification_topics]
class Volume(_Base):
diff --git a/etc/ceilometer/ceilometer.conf.sample b/etc/ceilometer/ceilometer.conf.sample
index 2ead0b57..784c741a 100644
--- a/etc/ceilometer/ceilometer.conf.sample
+++ b/etc/ceilometer/ceilometer.conf.sample
@@ -1,6 +1,200 @@
[DEFAULT]
#
+# Options defined in oslo.messaging
+#
+
+# Use durable queues in amqp. (boolean value)
+# Deprecated group/name - [DEFAULT]/rabbit_durable_queues
+#amqp_durable_queues=false
+
+# Auto-delete queues in amqp. (boolean value)
+#amqp_auto_delete=false
+
+# Size of RPC connection pool. (integer value)
+#rpc_conn_pool_size=30
+
+# Modules of exceptions that are permitted to be recreated
+# upon receiving exception data from an rpc call. (list value)
+#allowed_rpc_exception_modules=oslo.messaging.exceptions,nova.exception,cinder.exception,exceptions
+
+# Qpid broker hostname. (string value)
+#qpid_hostname=localhost
+
+# Qpid broker port. (integer value)
+#qpid_port=5672
+
+# Qpid HA cluster host:port pairs. (list value)
+#qpid_hosts=$qpid_hostname:$qpid_port
+
+# Username for Qpid connection. (string value)
+#qpid_username=
+
+# Password for Qpid connection. (string value)
+#qpid_password=
+
+# Space separated list of SASL mechanisms to use for auth.
+# (string value)
+#qpid_sasl_mechanisms=
+
+# Seconds between connection keepalive heartbeats. (integer
+# value)
+#qpid_heartbeat=60
+
+# Transport to use, either 'tcp' or 'ssl'. (string value)
+#qpid_protocol=tcp
+
+# Whether to disable the Nagle algorithm. (boolean value)
+#qpid_tcp_nodelay=true
+
+# The qpid topology version to use. Version 1 is what was
+# originally used by impl_qpid. Version 2 includes some
+# backwards-incompatible changes that allow broker federation
+# to work. Users should update to version 2 when they are
+# able to take everything down, as it requires a clean break.
+# (integer value)
+#qpid_topology_version=1
+
+# SSL version to use (valid only if SSL enabled). valid values
+# are TLSv1, SSLv23 and SSLv3. SSLv2 may be available on some
+# distributions. (string value)
+#kombu_ssl_version=
+
+# SSL key file (valid only if SSL enabled). (string value)
+#kombu_ssl_keyfile=
+
+# SSL cert file (valid only if SSL enabled). (string value)
+#kombu_ssl_certfile=
+
+# SSL certification authority file (valid only if SSL
+# enabled). (string value)
+#kombu_ssl_ca_certs=
+
+# How long to wait before reconnecting in response to an AMQP
+# consumer cancel notification. (floating point value)
+#kombu_reconnect_delay=1.0
+
+# The RabbitMQ broker address where a single node is used.
+# (string value)
+#rabbit_host=localhost
+
+# The RabbitMQ broker port where a single node is used.
+# (integer value)
+#rabbit_port=5672
+
+# RabbitMQ HA cluster host:port pairs. (list value)
+#rabbit_hosts=$rabbit_host:$rabbit_port
+
+# Connect over SSL for RabbitMQ. (boolean value)
+#rabbit_use_ssl=false
+
+# The RabbitMQ userid. (string value)
+#rabbit_userid=guest
+
+# The RabbitMQ password. (string value)
+#rabbit_password=guest
+
+# the RabbitMQ login method (string value)
+#rabbit_login_method=AMQPLAIN
+
+# The RabbitMQ virtual host. (string value)
+#rabbit_virtual_host=/
+
+# How frequently to retry connecting with RabbitMQ. (integer
+# value)
+#rabbit_retry_interval=1
+
+# How long to backoff for between retries when connecting to
+# RabbitMQ. (integer value)
+#rabbit_retry_backoff=2
+
+# Maximum number of RabbitMQ connection retries. Default is 0
+# (infinite retry count). (integer value)
+#rabbit_max_retries=0
+
+# Use HA queues in RabbitMQ (x-ha-policy: all). If you change
+# this option, you must wipe the RabbitMQ database. (boolean
+# value)
+#rabbit_ha_queues=false
+
+# If passed, use a fake RabbitMQ provider. (boolean value)
+#fake_rabbit=false
+
+# ZeroMQ bind address. Should be a wildcard (*), an ethernet
+# interface, or IP. The "host" option should point or resolve
+# to this address. (string value)
+#rpc_zmq_bind_address=*
+
+# MatchMaker driver. (string value)
+#rpc_zmq_matchmaker=oslo.messaging._drivers.matchmaker.MatchMakerLocalhost
+
+# ZeroMQ receiver listening port. (integer value)
+#rpc_zmq_port=9501
+
+# Number of ZeroMQ contexts, defaults to 1. (integer value)
+#rpc_zmq_contexts=1
+
+# Maximum number of ingress messages to locally buffer per
+# topic. Default is unlimited. (integer value)
+#rpc_zmq_topic_backlog=<None>
+
+# Directory for holding IPC sockets. (string value)
+#rpc_zmq_ipc_dir=/var/run/openstack
+
+# Name of this node. Must be a valid hostname, FQDN, or IP
+# address. Must match "host" option, if running Nova. (string
+# value)
+#rpc_zmq_host=ceilometer
+
+# Seconds to wait before a cast expires (TTL). Only supported
+# by impl_zmq. (integer value)
+#rpc_cast_timeout=30
+
+# Heartbeat frequency. (integer value)
+#matchmaker_heartbeat_freq=300
+
+# Heartbeat time-to-live. (integer value)
+#matchmaker_heartbeat_ttl=600
+
+# Host to locate redis. (string value)
+#host=127.0.0.1
+
+# Use this port to connect to redis host. (integer value)
+#port=6379
+
+# Password for Redis server (optional). (string value)
+#password=<None>
+
+# Size of RPC greenthread pool. (integer value)
+#rpc_thread_pool_size=64
+
+# Driver or drivers to handle sending notifications. (multi
+# valued)
+#notification_driver=
+
+# AMQP topic used for OpenStack notifications. (list value)
+# Deprecated group/name - [rpc_notifier2]/topics
+#notification_topics=notifications
+
+# Seconds to wait for a response from a call. (integer value)
+#rpc_response_timeout=60
+
+# A URL representing the messaging driver to use and its full
+# configuration. If not set, we fall back to the rpc_backend
+# option and driver specific configuration. (string value)
+#transport_url=<None>
+
+# The messaging driver to use, defaults to rabbit. Other
+# drivers include qpid and zmq. (string value)
+#rpc_backend=rabbit
+
+# The default exchange under which topics are scoped. May be
+# overridden by an exchange name specified in the
+# transport_url option. (string value)
+#control_exchange=openstack
+
+
+#
# Options defined in ceilometer.middleware
#
@@ -257,31 +451,6 @@
#
-# Options defined in ceilometer.openstack.common.notifier.api
-#
-
-# Driver or drivers to handle sending notifications (multi
-# valued)
-#notification_driver=
-
-# Default notification level for outgoing notifications
-# (string value)
-#default_notification_level=INFO
-
-# Default publisher_id for outgoing notifications (string
-# value)
-#default_publisher_id=<None>
-
-
-#
-# Options defined in ceilometer.openstack.common.notifier.rpc_notifier
-#
-
-# AMQP topic used for OpenStack notifications (list value)
-#notification_topics=notifications
-
-
-#
# Options defined in ceilometer.openstack.common.policy
#
@@ -294,196 +463,6 @@
#
-# Options defined in ceilometer.openstack.common.rpc
-#
-
-# The messaging module to use, defaults to kombu. (string
-# value)
-#rpc_backend=ceilometer.openstack.common.rpc.impl_kombu
-
-# Size of RPC thread pool (integer value)
-#rpc_thread_pool_size=64
-
-# Size of RPC connection pool (integer value)
-#rpc_conn_pool_size=30
-
-# Seconds to wait for a response from call or multicall
-# (integer value)
-#rpc_response_timeout=60
-
-# Seconds to wait before a cast expires (TTL). Only supported
-# by impl_zmq. (integer value)
-#rpc_cast_timeout=30
-
-# Modules of exceptions that are permitted to be recreated
-# upon receiving exception data from an rpc call. (list value)
-#allowed_rpc_exception_modules=nova.exception,cinder.exception,exceptions
-
-# If passed, use a fake RabbitMQ provider (boolean value)
-#fake_rabbit=false
-
-# AMQP exchange to connect to if using RabbitMQ or Qpid
-# (string value)
-#control_exchange=openstack
-
-
-#
-# Options defined in ceilometer.openstack.common.rpc.amqp
-#
-
-# Use durable queues in amqp. (boolean value)
-# Deprecated group/name - [DEFAULT]/rabbit_durable_queues
-#amqp_durable_queues=false
-
-# Auto-delete queues in amqp. (boolean value)
-#amqp_auto_delete=false
-
-
-#
-# Options defined in ceilometer.openstack.common.rpc.impl_kombu
-#
-
-# If SSL is enabled, the SSL version to use. Valid values are
-# TLSv1, SSLv23 and SSLv3. SSLv2 might be available on some
-# distributions. (string value)
-#kombu_ssl_version=
-
-# SSL key file (valid only if SSL enabled) (string value)
-#kombu_ssl_keyfile=
-
-# SSL cert file (valid only if SSL enabled) (string value)
-#kombu_ssl_certfile=
-
-# SSL certification authority file (valid only if SSL enabled)
-# (string value)
-#kombu_ssl_ca_certs=
-
-# The RabbitMQ broker address where a single node is used
-# (string value)
-#rabbit_host=localhost
-
-# The RabbitMQ broker port where a single node is used
-# (integer value)
-#rabbit_port=5672
-
-# RabbitMQ HA cluster host:port pairs (list value)
-#rabbit_hosts=$rabbit_host:$rabbit_port
-
-# Connect over SSL for RabbitMQ (boolean value)
-#rabbit_use_ssl=false
-
-# The RabbitMQ userid (string value)
-#rabbit_userid=guest
-
-# The RabbitMQ password (string value)
-#rabbit_password=guest
-
-# The RabbitMQ virtual host (string value)
-#rabbit_virtual_host=/
-
-# How frequently to retry connecting with RabbitMQ (integer
-# value)
-#rabbit_retry_interval=1
-
-# How long to backoff for between retries when connecting to
-# RabbitMQ (integer value)
-#rabbit_retry_backoff=2
-
-# Maximum number of RabbitMQ connection retries. Default is 0
-# (infinite retry count) (integer value)
-#rabbit_max_retries=0
-
-# Use HA queues in RabbitMQ (x-ha-policy: all). If you change
-# this option, you must wipe the RabbitMQ database. (boolean
-# value)
-#rabbit_ha_queues=false
-
-
-#
-# Options defined in ceilometer.openstack.common.rpc.impl_qpid
-#
-
-# Qpid broker hostname (string value)
-#qpid_hostname=localhost
-
-# Qpid broker port (integer value)
-#qpid_port=5672
-
-# Qpid HA cluster host:port pairs (list value)
-#qpid_hosts=$qpid_hostname:$qpid_port
-
-# Username for qpid connection (string value)
-#qpid_username=
-
-# Password for qpid connection (string value)
-#qpid_password=
-
-# Space separated list of SASL mechanisms to use for auth
-# (string value)
-#qpid_sasl_mechanisms=
-
-# Seconds between connection keepalive heartbeats (integer
-# value)
-#qpid_heartbeat=60
-
-# Transport to use, either 'tcp' or 'ssl' (string value)
-#qpid_protocol=tcp
-
-# Disable Nagle algorithm (boolean value)
-#qpid_tcp_nodelay=true
-
-# The qpid topology version to use. Version 1 is what was
-# originally used by impl_qpid. Version 2 includes some
-# backwards-incompatible changes that allow broker federation
-# to work. Users should update to version 2 when they are
-# able to take everything down, as it requires a clean break.
-# (integer value)
-#qpid_topology_version=1
-
-
-#
-# Options defined in ceilometer.openstack.common.rpc.impl_zmq
-#
-
-# ZeroMQ bind address. Should be a wildcard (*), an ethernet
-# interface, or IP. The "host" option should point or resolve
-# to this address. (string value)
-#rpc_zmq_bind_address=*
-
-# MatchMaker driver (string value)
-#rpc_zmq_matchmaker=ceilometer.openstack.common.rpc.matchmaker.MatchMakerLocalhost
-
-# ZeroMQ receiver listening port (integer value)
-#rpc_zmq_port=9501
-
-# Number of ZeroMQ contexts, defaults to 1 (integer value)
-#rpc_zmq_contexts=1
-
-# Maximum number of ingress messages to locally buffer per
-# topic. Default is unlimited. (integer value)
-#rpc_zmq_topic_backlog=<None>
-
-# Directory for holding IPC sockets (string value)
-#rpc_zmq_ipc_dir=/var/run/openstack
-
-# Name of this node. Must be a valid hostname, FQDN, or IP
-# address. Must match "host" option, if running Nova. (string
-# value)
-#rpc_zmq_host=ceilometer
-
-
-#
-# Options defined in ceilometer.openstack.common.rpc.matchmaker
-#
-
-# Heartbeat frequency (integer value)
-#matchmaker_heartbeat_freq=300
-
-# Heartbeat time-to-live. (integer value)
-#matchmaker_heartbeat_ttl=600
-
-
-#
# Options defined in ceilometer.orchestration.notifications
#
@@ -866,29 +845,13 @@
#enforce_token_bind=permissive
-[matchmaker_redis]
-
-#
-# Options defined in ceilometer.openstack.common.rpc.matchmaker_redis
-#
-
-# Host to locate redis (string value)
-#host=127.0.0.1
-
-# Use this port to connect to redis host. (integer value)
-#port=6379
-
-# Password for Redis server. (optional) (string value)
-#password=<None>
-
-
[matchmaker_ring]
#
-# Options defined in ceilometer.openstack.common.rpc.matchmaker_ring
+# Options defined in oslo.messaging
#
-# Matchmaker ring file (JSON) (string value)
+# Matchmaker ring file (JSON). (string value)
# Deprecated group/name - [DEFAULT]/matchmaker_ringfile
#ringfile=/etc/oslo/matchmaker_ring.json
@@ -930,16 +893,6 @@
#metering_topic=metering
-[rpc_notifier2]
-
-#
-# Options defined in ceilometer.openstack.common.notifier.rpc_notifier2
-#
-
-# AMQP topic(s) used for OpenStack notifications (list value)
-#topics=notifications
-
-
[service_credentials]
#
diff --git a/openstack-common.conf b/openstack-common.conf
index fa59260a..6a62f4ae 100644
--- a/openstack-common.conf
+++ b/openstack-common.conf
@@ -15,9 +15,7 @@ module=log_handler
module=loopingcall
module=middleware
module=network_utils
-module=notifier
module=policy
-module=rpc
module=service
module=strutils
module=threadgroup
diff --git a/requirements.txt b/requirements.txt
index ffe6ff05..8cca5ea0 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -8,7 +8,6 @@ happybase>=0.5,!=0.7
iso8601>=0.1.9
jsonpath-rw>=1.2.0,<2.0
jsonschema>=2.0.0,<3.0.0
-kombu>=2.4.8
lockfile>=0.8
lxml>=2.3
msgpack-python
@@ -18,6 +17,7 @@ oslo.vmware>=0.2 # Apache-2.0
pbr>=0.6,<1.0
pecan>=0.4.5
posix_ipc
+oslo.messaging>=1.3.0
pysnmp>=4.2.1,<5.0.0
python-ceilometerclient>=1.0.6
python-glanceclient>=0.9.0
diff --git a/setup.cfg b/setup.cfg
index ef67325b..0958344e 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -201,6 +201,13 @@ ceilometer.dispatcher =
network.statistics.drivers =
opendaylight = ceilometer.network.statistics.opendaylight.driver:OpenDayLightDriver
+# These are for backwards compat with Havana notification_driver configuration values
+oslo.messaging.notify.drivers =
+ ceilometer.openstack.common.notifier.log_notifier = oslo.messaging.notify._impl_log:LogDriver
+ ceilometer.openstack.common.notifier.no_op_notifier = oslo.messaging.notify._impl_noop:NoOpDriver
+ ceilometer.openstack.common.notifier.rpc_notifier2 = oslo.messaging.notify._impl_messaging:MessagingV2Driver
+ ceilometer.openstack.common.notifier.rpc_notifier = oslo.messaging.notify._impl_messaging:MessagingDriver
+ ceilometer.openstack.common.notifier.test_notifier = oslo.messaging.notify._impl_test:TestDriver
[build_sphinx]
all_files = 1
diff --git a/tools/config/oslo.config.generator.rc b/tools/config/oslo.config.generator.rc
index 13d8f331..35ac9874 100644
--- a/tools/config/oslo.config.generator.rc
+++ b/tools/config/oslo.config.generator.rc
@@ -1 +1,2 @@
-export CEILOMETER_CONFIG_GENERATOR_EXTRA_MODULES=keystoneclient.middleware.auth_token
+CEILOMETER_CONFIG_GENERATOR_EXTRA_LIBRARIES=oslo.messaging
+CEILOMETER_CONFIG_GENERATOR_EXTRA_MODULES=keystoneclient.middleware.auth_token