summaryrefslogtreecommitdiff
path: root/contrib
diff options
context:
space:
mode:
authorcbjchen@cn.ibm.com <cbjchen@cn.ibm.com>2014-01-14 14:48:14 +0800
committercbjchen@cn.ibm.com <cbjchen@cn.ibm.com>2014-01-25 23:51:30 +0800
commita61d8590391d69427a95589912bf6339f62a7a54 (patch)
treee30d777de359ad74b46d98825ea322a7dca030ad /contrib
parent834eb42e2e1aa0048a10708b5a598204da98ec1b (diff)
downloadheat-a61d8590391d69427a95589912bf6339f62a7a54.tar.gz
Marconi message queue resource implementation
This implements a Marconi backed native OpenStack message queue. Customers can create a OS::Marconi::Queue queue resource declaratively in templates and pass the href/endpoint of the queue to other resources by means of href attribute of the queue. Marconi bp, https://blueprints.launchpad.net/marconi/+spec/heat-template Implements: blueprint mqaas-marconi-resource Change-Id: Icbbb1869b352dbdba22530f9ec185652f4da75b6
Diffstat (limited to 'contrib')
-rw-r--r--contrib/marconi-plugin/README.md21
-rw-r--r--contrib/marconi-plugin/__init__.py0
-rw-r--r--contrib/marconi-plugin/plugin/__init__.py0
-rw-r--r--contrib/marconi-plugin/plugin/queue.py167
-rw-r--r--contrib/marconi-plugin/requirements.txt1
-rw-r--r--contrib/marconi-plugin/tests/__init__.py0
-rw-r--r--contrib/marconi-plugin/tests/test_queue.py241
7 files changed, 430 insertions, 0 deletions
diff --git a/contrib/marconi-plugin/README.md b/contrib/marconi-plugin/README.md
new file mode 100644
index 000000000..29b84801d
--- /dev/null
+++ b/contrib/marconi-plugin/README.md
@@ -0,0 +1,21 @@
+Marconi plugin for OpenStack Heat
+================================
+
+This plugin enable using Marconi queuing service as a resource in a Heat template.
+
+
+### 1. Install the Marconi plugin in Heat
+
+NOTE: Heat scans several directories to find plugins. The list of directories
+is specified in the configuration file "heat.conf" with the "plugin_dirs"
+directive.
+
+To install the Marconi plugin, one needs to first make sure the
+python-marconiclient package is installed - pip install -r requirements.txt, and
+copy the plugin implementation, e.g. queue.py to wherever plugin_dirs points to.
+
+
+### 2. Restart heat
+
+Only the process "heat-engine" needs to be restarted to load the newly installed
+plugin.
diff --git a/contrib/marconi-plugin/__init__.py b/contrib/marconi-plugin/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/contrib/marconi-plugin/__init__.py
diff --git a/contrib/marconi-plugin/plugin/__init__.py b/contrib/marconi-plugin/plugin/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/contrib/marconi-plugin/plugin/__init__.py
diff --git a/contrib/marconi-plugin/plugin/queue.py b/contrib/marconi-plugin/plugin/queue.py
new file mode 100644
index 000000000..d427b1bbe
--- /dev/null
+++ b/contrib/marconi-plugin/plugin/queue.py
@@ -0,0 +1,167 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from heat.common import exception
+from heat.engine import clients
+from heat.engine import properties
+from heat.engine import resource
+from heat.openstack.common import log as logging
+
+
+logger = logging.getLogger(__name__)
+
+
+try:
+ from marconiclient.queues.v1 import client as marconiclient
+except ImportError:
+ marconiclient = None
+ logger.info(_('marconiclient not available'))
+
+ def resource_mapping():
+ return {}
+else:
+ def resource_mapping():
+ return {
+ 'OS::Marconi::Queue': MarconiQueue,
+ }
+
+
+class Clients(clients.OpenStackClients):
+ '''
+ Convenience class to create and cache client instances.
+ '''
+ def __init__(self, context):
+ super(Clients, self).__init__(context)
+ self._marconi = None
+
+ def marconi(self, service_type="queuing"):
+ if self._marconi:
+ return self._marconi
+
+ con = self.context
+ if self.auth_token is None:
+ logger.error(_("Marconi connection failed, no auth_token!"))
+ return None
+
+ opts = {
+ 'os_auth_token': con.auth_token,
+ 'os_auth_url': con.auth_url,
+ 'os_project_id': con.tenant,
+ 'os_service_type': service_type,
+ }
+ auth_opts = {'backend': 'keystone',
+ 'options': opts}
+ conf = {'auth_opts': auth_opts}
+ endpoint = self.url_for(service_type=service_type)
+
+ self._marconi = marconiclient.Client(url=endpoint, conf=conf)
+
+ return self._marconi
+
+
+class MarconiQueue(resource.Resource):
+
+ PROPERTIES = (
+ NAME, METADATA,
+ ) = (
+ 'name', 'metadata',
+ )
+
+ properties_schema = {
+ NAME: properties.Schema(
+ properties.Schema.STRING,
+ _("Name of the queue instance to create."),
+ required=True),
+ METADATA: properties.Schema(
+ properties.Schema.MAP,
+ description=_("Arbitrary key/value metadata to store "
+ "contextual information about this queue."),
+ update_allowed=True)
+ }
+
+ attributes_schema = {
+ "queue_id": _("ID of the queue."),
+ "href": _("The resource href of the queue.")
+ }
+
+ update_allowed_keys = ('Properties',)
+
+ def __init__(self, name, json_snippet, stack):
+ super(MarconiQueue, self).__init__(name, json_snippet, stack)
+ self.clients = Clients(self.context)
+
+ def marconi(self):
+ return self.clients.marconi()
+
+ def physical_resource_name(self):
+ return self.properties[self.NAME]
+
+ def handle_create(self):
+ '''
+ Create a marconi message queue.
+ '''
+ queue_name = self.physical_resource_name()
+ queue = self.marconi().queue(queue_name, auto_create=False)
+ # Marconi client doesn't report an error if an queue with the same
+ # id/name already exists, which can cause issue with stack update.
+ if queue.exists():
+ raise exception.Error(_('Message queue %s already exists.')
+ % queue_name)
+ queue.ensure_exists()
+ self.resource_id_set(queue_name)
+ return queue
+
+ def check_create_complete(self, queue):
+ # set metadata of the newly created queue
+ if queue.exists():
+ metadata = self.properties.get('metadata')
+ if metadata:
+ queue.metadata(new_meta=metadata)
+ return True
+
+ queue_name = self.physical_resource_name()
+ raise exception.Error(_('Message queue %s creation failed.')
+ % queue_name)
+
+ def handle_update(self, json_snippet, tmpl_diff, prop_diff):
+ '''
+ Update queue metadata.
+ '''
+ if 'metadata' in prop_diff:
+ queue = self.marconi().queue(self.resource_id, auto_create=False)
+ metadata = prop_diff['metadata']
+ queue.metadata(new_meta=metadata)
+
+ def handle_delete(self):
+ '''
+ Delete a marconi message queue.
+ '''
+ if not self.resource_id:
+ return
+
+ queue = self.marconi().queue(self.resource_id, auto_create=False)
+ queue.delete()
+
+ def href(self):
+ api_endpoint = self.marconi().api_url
+ queue_name = self.physical_resource_name()
+ if api_endpoint.endswith('/'):
+ return '%squeues/%s' % (api_endpoint, queue_name)
+ else:
+ return '%s/queues/%s' % (api_endpoint, queue_name)
+
+ def _resolve_attribute(self, name):
+ if name == 'queue_id':
+ return self.resource_id
+ elif name == 'href':
+ return self.href()
diff --git a/contrib/marconi-plugin/requirements.txt b/contrib/marconi-plugin/requirements.txt
new file mode 100644
index 000000000..0bd5c0f86
--- /dev/null
+++ b/contrib/marconi-plugin/requirements.txt
@@ -0,0 +1 @@
+python-marconiclient>=0.0.1a1
diff --git a/contrib/marconi-plugin/tests/__init__.py b/contrib/marconi-plugin/tests/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/contrib/marconi-plugin/tests/__init__.py
diff --git a/contrib/marconi-plugin/tests/test_queue.py b/contrib/marconi-plugin/tests/test_queue.py
new file mode 100644
index 000000000..f48a466d0
--- /dev/null
+++ b/contrib/marconi-plugin/tests/test_queue.py
@@ -0,0 +1,241 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from heat.common import exception
+from heat.common import template_format
+from heat.engine import parser
+from heat.engine import resource
+from heat.engine import scheduler
+from heat.tests.common import HeatTestCase
+from heat.tests import utils
+
+from ..plugin import queue # noqa
+
+wp_template = '''
+{
+ "AWSTemplateFormatVersion" : "2010-09-09",
+ "Description" : "openstack Marconi queue service as a resource",
+ "Resources" : {
+ "MyQueue2" : {
+ "Type" : "OS::Marconi::Queue",
+ "Properties" : {
+ "name": "myqueue",
+ "metadata": { "key1": { "key2": "value", "key3": [1, 2] } }
+ }
+ }
+ },
+ "Outputs" : {
+ "queue_id": {
+ "Value": { "Fn::GetAtt" : [ "MyQueue2", "queue_id" ]},
+ "Description": "queue name"
+ },
+ "queue_href": {
+ "Value": { "Fn::GetAtt" : [ "MyQueue2", "href" ]},
+ "Description": "queue href"
+ }
+ }
+}
+'''
+
+
+class FakeQueue(object):
+ def __init__(self, queue_name, auto_create=True):
+ self._id = queue_name
+ self._auto_create = auto_create
+ self._exists = False
+
+ def exists(self):
+ return self._exists
+
+ def ensure_exists(self):
+ self._exists = True
+
+ def metadata(self, new_meta=None):
+ pass
+
+ def delete(self):
+ pass
+
+
+class MarconiMessageQueueTest(HeatTestCase):
+ def setUp(self):
+ super(MarconiMessageQueueTest, self).setUp()
+ self.fc = self.m.CreateMockAnything()
+ utils.setup_dummy_db()
+ self.ctx = utils.dummy_context()
+ resource._register_class("OS::Marconi::Queue",
+ queue.MarconiQueue)
+
+ def parse_stack(self, t):
+ stack_name = 'test_stack'
+ tmpl = parser.Template(t)
+ self.stack = parser.Stack(self.ctx, stack_name, tmpl)
+ self.stack.validate()
+ self.stack.store()
+
+ @utils.stack_delete_after
+ def test_create(self):
+ t = template_format.parse(wp_template)
+ self.parse_stack(t)
+
+ queue = self.stack['MyQueue2']
+ self.m.StubOutWithMock(queue, 'marconi')
+ queue.marconi().MultipleTimes().AndReturn(self.fc)
+
+ fake_q = FakeQueue(queue.physical_resource_name(), auto_create=False)
+ self.m.StubOutWithMock(self.fc, 'queue')
+ self.fc.queue(queue.physical_resource_name(),
+ auto_create=False).AndReturn(fake_q)
+ self.m.StubOutWithMock(fake_q, 'exists')
+ fake_q.exists().AndReturn(False)
+ self.m.StubOutWithMock(fake_q, 'ensure_exists')
+ fake_q.ensure_exists()
+ fake_q.exists().AndReturn(True)
+ self.m.StubOutWithMock(fake_q, 'metadata')
+ fake_q.metadata(new_meta=queue.properties.get('metadata'))
+
+ self.m.ReplayAll()
+
+ scheduler.TaskRunner(queue.create)()
+ self.fc.api_url = 'http://127.0.0.1:8888/v1'
+ self.assertEqual('myqueue', queue.FnGetAtt('queue_id'))
+ self.assertEqual('http://127.0.0.1:8888/v1/queues/myqueue',
+ queue.FnGetAtt('href'))
+
+ self.m.VerifyAll()
+
+ @utils.stack_delete_after
+ def test_create_existing_queue(self):
+ t = template_format.parse(wp_template)
+ self.parse_stack(t)
+
+ queue = self.stack['MyQueue2']
+ self.m.StubOutWithMock(queue, 'marconi')
+ queue.marconi().MultipleTimes().AndReturn(self.fc)
+
+ fake_q = FakeQueue("myqueue", auto_create=False)
+ self.m.StubOutWithMock(self.fc, 'queue')
+ self.fc.queue("myqueue", auto_create=False).AndReturn(fake_q)
+ self.m.StubOutWithMock(fake_q, 'exists')
+ fake_q.exists().AndReturn(True)
+ self.m.ReplayAll()
+
+ err = self.assertRaises(exception.ResourceFailure,
+ scheduler.TaskRunner(queue.create))
+ self.assertEqual("Error: Message queue myqueue already exists.",
+ str(err))
+ self.m.VerifyAll()
+
+ @utils.stack_delete_after
+ def test_create_failed(self):
+ t = template_format.parse(wp_template)
+ self.parse_stack(t)
+
+ queue = self.stack['MyQueue2']
+ self.m.StubOutWithMock(queue, 'marconi')
+ queue.marconi().MultipleTimes().AndReturn(self.fc)
+
+ fake_q = FakeQueue("myqueue", auto_create=False)
+ self.m.StubOutWithMock(self.fc, 'queue')
+ self.fc.queue("myqueue", auto_create=False).AndReturn(fake_q)
+ self.m.StubOutWithMock(fake_q, 'exists')
+ fake_q.exists().AndReturn(False)
+ self.m.StubOutWithMock(fake_q, 'ensure_exists')
+ fake_q.ensure_exists()
+ fake_q.exists().AndReturn(False)
+
+ self.m.ReplayAll()
+
+ err = self.assertRaises(exception.ResourceFailure,
+ scheduler.TaskRunner(queue.create))
+ self.assertEqual("Error: Message queue myqueue creation failed.",
+ str(err))
+ self.m.VerifyAll()
+
+ @utils.stack_delete_after
+ def test_delete(self):
+ t = template_format.parse(wp_template)
+ self.parse_stack(t)
+
+ queue = self.stack['MyQueue2']
+ queue.resource_id_set(queue.properties.get('name'))
+ self.m.StubOutWithMock(queue, 'marconi')
+ queue.marconi().MultipleTimes().AndReturn(self.fc)
+
+ fake_q = FakeQueue("myqueue", auto_create=False)
+ self.m.StubOutWithMock(self.fc, 'queue')
+ self.fc.queue("myqueue",
+ auto_create=False).MultipleTimes().AndReturn(fake_q)
+ self.m.StubOutWithMock(fake_q, 'delete')
+ fake_q.delete()
+
+ self.m.ReplayAll()
+
+ scheduler.TaskRunner(queue.create)()
+ scheduler.TaskRunner(queue.delete)()
+ self.m.VerifyAll()
+
+ @utils.stack_delete_after
+ def test_update_in_place(self):
+ t = template_format.parse(wp_template)
+ self.parse_stack(t)
+ queue = self.stack['MyQueue2']
+ queue.resource_id_set(queue.properties.get('name'))
+ self.m.StubOutWithMock(queue, 'marconi')
+ queue.marconi().MultipleTimes().AndReturn(self.fc)
+ fake_q = FakeQueue('myqueue', auto_create=False)
+ self.m.StubOutWithMock(self.fc, 'queue')
+ self.fc.queue('myqueue',
+ auto_create=False).MultipleTimes().AndReturn(fake_q)
+ self.m.StubOutWithMock(fake_q, 'metadata')
+ fake_q.metadata(new_meta={"key1": {"key2": "value", "key3": [1, 2]}})
+
+ # Expected to be called during update
+ fake_q.metadata(new_meta={'key1': 'value'})
+
+ self.m.ReplayAll()
+
+ t = template_format.parse(wp_template)
+ new_queue = t['Resources']['MyQueue2']
+ new_queue['Properties']['metadata'] = {'key1': 'value'}
+
+ scheduler.TaskRunner(queue.create)()
+ scheduler.TaskRunner(queue.update, new_queue)()
+ self.m.VerifyAll()
+
+ @utils.stack_delete_after
+ def test_update_replace(self):
+ t = template_format.parse(wp_template)
+ self.parse_stack(t)
+ queue = self.stack['MyQueue2']
+ queue.resource_id_set(queue.properties.get('name'))
+ self.m.StubOutWithMock(queue, 'marconi')
+ queue.marconi().MultipleTimes().AndReturn(self.fc)
+ fake_q = FakeQueue('myqueue', auto_create=False)
+ self.m.StubOutWithMock(self.fc, 'queue')
+ self.fc.queue('myqueue',
+ auto_create=False).MultipleTimes().AndReturn(fake_q)
+
+ self.m.ReplayAll()
+
+ t = template_format.parse(wp_template)
+ t['Resources']['MyQueue2']['Properties']['name'] = 'new_queue'
+ new_queue = t['Resources']['MyQueue2']
+
+ scheduler.TaskRunner(queue.create)()
+ err = self.assertRaises(resource.UpdateReplace,
+ scheduler.TaskRunner(queue.update,
+ new_queue))
+ msg = 'The Resource MyQueue2 requires replacement.'
+ self.assertEqual(msg, str(err))
+
+ self.m.VerifyAll()