summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2015-03-31 20:16:16 +0000
committerGerrit Code Review <review@openstack.org>2015-03-31 20:16:16 +0000
commit9d8b5a75b05b884d9281e94220699b3d89b2ca1d (patch)
tree5cdf7d6553e0887b495af378ca3234fc3c6a3c94
parente72bd4c8bcba6d8803473ba0ef2a2f662c7140c9 (diff)
parentc45e680277ecee3037c11956fcddeac6ff5f7800 (diff)
downloadglance-9d8b5a75b05b884d9281e94220699b3d89b2ca1d.tar.gz
Merge "Catalog Index Service - Index Update"
-rw-r--r--glance/cmd/agent_notification.py30
-rw-r--r--glance/listener.py90
-rw-r--r--glance/search/plugins/base.py21
-rw-r--r--glance/search/plugins/images.py11
-rw-r--r--glance/search/plugins/images_notification_handler.py83
-rw-r--r--glance/search/plugins/metadefs.py29
-rw-r--r--glance/search/plugins/metadefs_notification_handler.py251
-rw-r--r--glance/service.py107
8 files changed, 622 insertions, 0 deletions
diff --git a/glance/cmd/agent_notification.py b/glance/cmd/agent_notification.py
new file mode 100644
index 000000000..6d643f1d3
--- /dev/null
+++ b/glance/cmd/agent_notification.py
@@ -0,0 +1,30 @@
+# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from glance import listener
+from glance.openstack.common import service as os_service
+from glance import service
+
+
+def main():
+ service.prepare_service()
+ launcher = os_service.ProcessLauncher()
+ launcher.launch_service(
+ listener.ListenerService(),
+ workers=service.get_workers('listener'))
+ launcher.wait()
+
+if __name__ == "__main__":
+ main()
diff --git a/glance/listener.py b/glance/listener.py
new file mode 100644
index 000000000..a9b790eed
--- /dev/null
+++ b/glance/listener.py
@@ -0,0 +1,90 @@
+# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from oslo.config import cfg
+from oslo import messaging
+from oslo_log import log as logging
+import stevedore
+
+from glance import i18n
+from glance.openstack.common import service as os_service
+
+LOG = logging.getLogger(__name__)
+_ = i18n._
+_LE = i18n._LE
+
+
+class NotificationEndpoint(object):
+
+ def __init__(self):
+ self.plugins = get_plugins()
+ self.notification_target_map = dict()
+ for plugin in self.plugins:
+ try:
+ event_list = plugin.obj.get_notification_supported_events()
+ for event in event_list:
+ self.notification_target_map[event.lower()] = plugin.obj
+ except Exception as e:
+ LOG.error(_LE("Failed to retrieve supported notification"
+ " events from search plugins "
+ "%(ext)s: %(e)s") %
+ {'ext': plugin.name, 'e': e})
+
+ def info(self, ctxt, publisher_id, event_type, payload, metadata):
+ event_type_l = event_type.lower()
+ if event_type_l in self.notification_target_map:
+ plugin = self.notification_target_map[event_type_l]
+ handler = plugin.get_notification_handler()
+ handler.process(
+ ctxt,
+ publisher_id,
+ event_type,
+ payload,
+ metadata)
+
+
+class ListenerService(os_service.Service):
+ def __init__(self, *args, **kwargs):
+ super(ListenerService, self).__init__(*args, **kwargs)
+ self.listeners = []
+
+ def start(self):
+ super(ListenerService, self).start()
+ transport = messaging.get_transport(cfg.CONF)
+ targets = [
+ messaging.Target(topic="notifications", exchange="glance")
+ ]
+ endpoints = [
+ NotificationEndpoint()
+ ]
+ listener = messaging.get_notification_listener(
+ transport,
+ targets,
+ endpoints)
+ listener.start()
+ self.listeners.append(listener)
+
+ def stop(self):
+ for listener in self.listeners:
+ listener.stop()
+ listener.wait()
+ super(ListenerService, self).stop()
+
+
+def get_plugins():
+ namespace = 'glance.search.index_backend'
+ ext_manager = stevedore.extension.ExtensionManager(
+ namespace, invoke_on_load=True)
+ return ext_manager.extensions
diff --git a/glance/search/plugins/base.py b/glance/search/plugins/base.py
index b2ec6c4fa..ac7dcc1d2 100644
--- a/glance/search/plugins/base.py
+++ b/glance/search/plugins/base.py
@@ -117,3 +117,24 @@ class IndexBase(object):
def get_mapping(self):
"""Get an index mapping."""
return {}
+
+ def get_notification_handler(self):
+ """Get the notification handler which implements NotificationBase."""
+ return None
+
+ def get_notification_supported_events(self):
+ """Get the list of suppported event types."""
+ return []
+
+
+@six.add_metaclass(abc.ABCMeta)
+class NotificationBase(object):
+
+ def __init__(self, engine, index_name, document_type):
+ self.engine = engine
+ self.index_name = index_name
+ self.document_type = document_type
+
+ @abc.abstractmethod
+ def process(self, ctxt, publisher_id, event_type, payload, metadata):
+ """Process the incoming notification message."""
diff --git a/glance/search/plugins/images.py b/glance/search/plugins/images.py
index a46fc1876..6b0d68217 100644
--- a/glance/search/plugins/images.py
+++ b/glance/search/plugins/images.py
@@ -22,6 +22,7 @@ from glance.common import property_utils
import glance.db
from glance.db.sqlalchemy import models
from glance.search.plugins import base
+from glance.search.plugins import images_notification_handler
class ImageIndex(base.IndexBase):
@@ -150,3 +151,13 @@ class ImageIndex(base.IndexBase):
document[image_property.name] = image_property.value
return document
+
+ def get_notification_handler(self):
+ return images_notification_handler.ImageHandler(
+ self.engine,
+ self.get_index_name(),
+ self.get_document_type()
+ )
+
+ def get_notification_supported_events(self):
+ return ['image.create', 'image.update', 'image.delete']
diff --git a/glance/search/plugins/images_notification_handler.py b/glance/search/plugins/images_notification_handler.py
new file mode 100644
index 000000000..d5346db03
--- /dev/null
+++ b/glance/search/plugins/images_notification_handler.py
@@ -0,0 +1,83 @@
+# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from oslo_log import log as logging
+import oslo_messaging
+
+from glance.common import utils
+from glance.search.plugins import base
+
+LOG = logging.getLogger(__name__)
+
+
+class ImageHandler(base.NotificationBase):
+
+ def __init__(self, *args, **kwargs):
+ super(ImageHandler, self).__init__(*args, **kwargs)
+ self.image_delete_keys = ['deleted_at', 'deleted',
+ 'is_public', 'properties']
+
+ def process(self, ctxt, publisher_id, event_type, payload, metadata):
+ try:
+ actions = {
+ "image.create": self.create,
+ "image.update": self.update,
+ "image.delete": self.delete
+ }
+ actions[event_type](payload)
+ return oslo_messaging.NotificationResult.HANDLED
+ except Exception as e:
+ LOG.error(utils.exception_to_str(e))
+
+ def create(self, payload):
+ id = payload['id']
+ payload = self.format_image(payload)
+ self.engine.create(
+ index=self.index_name,
+ doc_type=self.document_type,
+ body=payload,
+ id=id
+ )
+
+ def update(self, payload):
+ id = payload['id']
+ payload = self.format_image(payload)
+ doc = {"doc": payload}
+ self.engine.update(
+ index=self.index_name,
+ doc_type=self.document_type,
+ body=doc,
+ id=id
+ )
+
+ def delete(self, payload):
+ id = payload['id']
+ self.engine.delete(
+ index=self.index_name,
+ doc_type=self.document_type,
+ id=id
+ )
+
+ def format_image(self, payload):
+ visibility = 'public' if payload['is_public'] else 'private'
+ payload['visibility'] = visibility
+
+ payload.update(payload.get('properties', '{}'))
+
+ for key in payload.keys():
+ if key in self.image_delete_keys:
+ del payload[key]
+
+ return payload
diff --git a/glance/search/plugins/metadefs.py b/glance/search/plugins/metadefs.py
index 3ff5d86ed..0b6314321 100644
--- a/glance/search/plugins/metadefs.py
+++ b/glance/search/plugins/metadefs.py
@@ -20,6 +20,7 @@ import six
import glance.db
from glance.db.sqlalchemy import models_metadef as models
from glance.search.plugins import base
+from glance.search.plugins import metadefs_notification_handler
class MetadefIndex(base.IndexBase):
@@ -228,3 +229,31 @@ class MetadefIndex(base.IndexBase):
return {
'name': tag.name
}
+
+ def get_notification_handler(self):
+ return metadefs_notification_handler.MetadefHandler(
+ self.engine,
+ self.get_index_name(),
+ self.get_document_type()
+ )
+
+ def get_notification_supported_events(self):
+ return [
+ "metadef_namespace.create",
+ "metadef_namespace.update",
+ "metadef_namespace.delete",
+ "metadef_object.create",
+ "metadef_object.update",
+ "metadef_object.delete",
+ "metadef_property.create",
+ "metadef_property.update",
+ "metadef_property.delete",
+ "metadef_tag.create",
+ "metadef_tag.update",
+ "metadef_tag.delete",
+ "metadef_resource_type.create",
+ "metadef_resource_type.delete",
+ "metadef_namespace.delete_properties",
+ "metadef_namespace.delete_objects",
+ "metadef_namespace.delete_tags"
+ ]
diff --git a/glance/search/plugins/metadefs_notification_handler.py b/glance/search/plugins/metadefs_notification_handler.py
new file mode 100644
index 000000000..18b168726
--- /dev/null
+++ b/glance/search/plugins/metadefs_notification_handler.py
@@ -0,0 +1,251 @@
+# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import six
+
+from oslo_log import log as logging
+import oslo_messaging
+
+from glance.common import utils
+from glance.search.plugins import base
+
+LOG = logging.getLogger(__name__)
+
+
+class MetadefHandler(base.NotificationBase):
+
+ def __init__(self, *args, **kwargs):
+ super(MetadefHandler, self).__init__(*args, **kwargs)
+ self.namespace_delete_keys = ['deleted_at', 'deleted', 'created_at',
+ 'updated_at', 'namespace_old']
+ self.property_delete_keys = ['deleted', 'deleted_at',
+ 'name_old', 'namespace', 'name']
+
+ def process(self, ctxt, publisher_id, event_type, payload, metadata):
+ try:
+ actions = {
+ "metadef_namespace.create": self.create_ns,
+ "metadef_namespace.update": self.update_ns,
+ "metadef_namespace.delete": self.delete_ns,
+ "metadef_object.create": self.create_obj,
+ "metadef_object.update": self.update_obj,
+ "metadef_object.delete": self.delete_obj,
+ "metadef_property.create": self.create_prop,
+ "metadef_property.update": self.update_prop,
+ "metadef_property.delete": self.delete_prop,
+ "metadef_resource_type.create": self.create_rs,
+ "metadef_resource_type.delete": self.delete_rs,
+ "metadef_tag.create": self.create_tag,
+ "metadef_tag.update": self.update_tag,
+ "metadef_tag.delete": self.delete_tag,
+ "metadef_namespace.delete_properties": self.delete_props,
+ "metadef_namespace.delete_objects": self.delete_objects,
+ "metadef_namespace.delete_tags": self.delete_tags
+ }
+ actions[event_type](payload)
+ return oslo_messaging.NotificationResult.HANDLED
+ except Exception as e:
+ LOG.error(utils.exception_to_str(e))
+
+ def run_create(self, id, payload):
+ self.engine.create(
+ index=self.index_name,
+ doc_type=self.document_type,
+ body=payload,
+ id=id
+ )
+
+ def run_update(self, id, payload, script=False):
+ if script:
+ self.engine.update(
+ index=self.index_name,
+ doc_type=self.document_type,
+ body=payload,
+ id=id)
+ else:
+ doc = {"doc": payload}
+ self.engine.update(
+ index=self.index_name,
+ doc_type=self.document_type,
+ body=doc,
+ id=id)
+
+ def run_delete(self, id):
+ self.engine.delete(
+ index=self.index_name,
+ doc_type=self.document_type,
+ id=id
+ )
+
+ def create_ns(self, payload):
+ id = payload['namespace']
+ self.run_create(id, self.format_namespace(payload))
+
+ def update_ns(self, payload):
+ id = payload['namespace_old']
+ self.run_update(id, self.format_namespace(payload))
+
+ def delete_ns(self, payload):
+ id = payload['namespace']
+ self.run_delete(id)
+
+ def create_obj(self, payload):
+ id = payload['namespace']
+ object = self.format_object(payload)
+ self.create_entity(id, "objects", object)
+
+ def update_obj(self, payload):
+ id = payload['namespace']
+ object = self.format_object(payload)
+ self.update_entity(id, "objects", object,
+ payload['name_old'], "name")
+
+ def delete_obj(self, payload):
+ id = payload['namespace']
+ self.delete_entity(id, "objects", payload['name'], "name")
+
+ def create_prop(self, payload):
+ id = payload['namespace']
+ property = self.format_property(payload)
+ self.create_entity(id, "properties", property)
+
+ def update_prop(self, payload):
+ id = payload['namespace']
+ property = self.format_property(payload)
+ self.update_entity(id, "properties", property,
+ payload['name_old'], "property")
+
+ def delete_prop(self, payload):
+ id = payload['namespace']
+ self.delete_entity(id, "properties", payload['name'], "property")
+
+ def create_rs(self, payload):
+ id = payload['namespace']
+ resource_type = dict()
+ resource_type['name'] = payload['name']
+ if payload['prefix']:
+ resource_type['prefix'] = payload['prefix']
+ if payload['properties_target']:
+ resource_type['properties_target'] = payload['properties_target']
+
+ self.create_entity(id, "resource_types", resource_type)
+
+ def delete_rs(self, payload):
+ id = payload['namespace']
+ self.delete_entity(id, "resource_types", payload['name'], "name")
+
+ def create_tag(self, payload):
+ id = payload['namespace']
+ tag = dict()
+ tag['name'] = payload['name']
+
+ self.create_entity(id, "tags", tag)
+
+ def update_tag(self, payload):
+ id = payload['namespace']
+ tag = dict()
+ tag['name'] = payload['name']
+
+ self.update_entity(id, "tags", tag, payload['name_old'], "name")
+
+ def delete_tag(self, payload):
+ id = payload['namespace']
+ self.delete_entity(id, "tags", payload['name'], "name")
+
+ def delete_props(self, payload):
+ self.delete_field(payload, "properties")
+
+ def delete_objects(self, payload):
+ self.delete_field(payload, "objects")
+
+ def delete_tags(self, payload):
+ self.delete_field(payload, "tags")
+
+ def create_entity(self, id, entity, entity_data):
+ script = ("if (ctx._source.containsKey('%(entity)s'))"
+ "{ctx._source.%(entity)s += entity_item }"
+ "else {ctx._source.%(entity)s=entity_list};" %
+ {"entity": entity})
+
+ params = {
+ "entity_item": entity_data,
+ "entity_list": [entity_data]
+ }
+ payload = {"script": script, "params": params}
+ self.run_update(id, payload=payload, script=True)
+
+ def update_entity(self, id, entity, entity_data, entity_id, field_name):
+ entity_id = entity_id.lower()
+ script = ("obj=null; for(entity_item :ctx._source.%(entity)s)"
+ "{if(entity_item['%(field_name)s'].toLowerCase() "
+ " == entity_id ) obj=entity_item;};"
+ "if(obj!=null)ctx._source.%(entity)s.remove(obj);"
+ "if (ctx._source.containsKey('%(entity)s'))"
+ "{ctx._source.%(entity)s += entity_item; }"
+ "else {ctx._source.%(entity)s=entity_list;}" %
+ {"entity": entity, "field_name": field_name})
+ params = {
+ "entity_item": entity_data,
+ "entity_list": [entity_data],
+ "entity_id": entity_id
+ }
+ payload = {"script": script, "params": params}
+ self.run_update(id, payload=payload, script=True)
+
+ def delete_entity(self, id, entity, entity_id, field_name):
+ entity_id = entity_id.lower()
+ script = ("obj=null; for(entity_item :ctx._source.%(entity)s)"
+ "{if(entity_item['%(field_name)s'].toLowerCase() "
+ " == entity_id ) obj=entity_item;};"
+ "if(obj!=null)ctx._source.%(entity)s.remove(obj);" %
+ {"entity": entity, "field_name": field_name})
+ params = {
+ "entity_id": entity_id
+ }
+ payload = {"script": script, "params": params}
+ self.run_update(id, payload=payload, script=True)
+
+ def delete_field(self, payload, field):
+ id = payload['namespace']
+ script = ("if (ctx._source.containsKey('%(field)s'))"
+ "{ctx._source.remove('%(field)s')}") % {"field": field}
+ payload = {"script": script}
+ self.run_update(id, payload=payload, script=True)
+
+ def format_namespace(self, payload):
+ for key in self.namespace_delete_keys:
+ if key in payload.keys():
+ del payload[key]
+ return payload
+
+ def format_object(self, payload):
+ formatted_object = dict()
+ formatted_object['name'] = payload['name']
+ formatted_object['description'] = payload['description']
+ if payload['required']:
+ formatted_object['required'] = payload['required']
+ formatted_object['properties'] = []
+ for property in payload['properties']:
+ formatted_property = self.format_property(property)
+ formatted_object['properties'].append(formatted_property)
+ return formatted_object
+
+ def format_property(self, payload):
+ prop_data = dict()
+ prop_data['property'] = payload['name']
+ for key, value in six.iteritems(payload):
+ if key not in self.property_delete_keys and value:
+ prop_data[key] = value
+ return prop_data
diff --git a/glance/service.py b/glance/service.py
new file mode 100644
index 000000000..12c62237f
--- /dev/null
+++ b/glance/service.py
@@ -0,0 +1,107 @@
+#!/usr/bin/env python
+#
+# Copyright 2012-2014 eNovance <licensing@enovance.com>
+# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import os
+import socket
+import sys
+
+from oslo.config import cfg
+from oslo import i18n
+import oslo.messaging
+from oslo_log import log
+
+CONF = cfg.CONF
+
+OPTS = [
+ cfg.StrOpt('host',
+ default=socket.gethostname(),
+ help='Name of this node, which must be valid in an AMQP '
+ 'key. Can be an opaque identifier. For ZeroMQ only, must '
+ 'be a valid host name, FQDN, or IP address.'),
+ cfg.IntOpt('listener_workers',
+ default=1,
+ help='Number of workers for notification service. A single '
+ 'notification agent is enabled by default.'),
+ cfg.IntOpt('http_timeout',
+ default=600,
+ help='Timeout seconds for HTTP requests. Set it to None to '
+ 'disable timeout.'),
+]
+CONF.register_opts(OPTS)
+
+CLI_OPTS = [
+ cfg.StrOpt('os-username',
+ deprecated_group="DEFAULT",
+ default=os.environ.get('OS_USERNAME', 'glance'),
+ help='User name to use for OpenStack service access.'),
+ cfg.StrOpt('os-password',
+ deprecated_group="DEFAULT",
+ secret=True,
+ default=os.environ.get('OS_PASSWORD', 'admin'),
+ help='Password to use for OpenStack service access.'),
+ cfg.StrOpt('os-tenant-id',
+ deprecated_group="DEFAULT",
+ default=os.environ.get('OS_TENANT_ID', ''),
+ help='Tenant ID to use for OpenStack service access.'),
+ cfg.StrOpt('os-tenant-name',
+ deprecated_group="DEFAULT",
+ default=os.environ.get('OS_TENANT_NAME', 'admin'),
+ help='Tenant name to use for OpenStack service access.'),
+ cfg.StrOpt('os-cacert',
+ default=os.environ.get('OS_CACERT'),
+ help='Certificate chain for SSL validation.'),
+ cfg.StrOpt('os-auth-url',
+ deprecated_group="DEFAULT",
+ default=os.environ.get('OS_AUTH_URL',
+ 'http://localhost:5000/v2.0'),
+ help='Auth URL to use for OpenStack service access.'),
+ cfg.StrOpt('os-region-name',
+ deprecated_group="DEFAULT",
+ default=os.environ.get('OS_REGION_NAME'),
+ help='Region name to use for OpenStack service endpoints.'),
+ cfg.StrOpt('os-endpoint-type',
+ default=os.environ.get('OS_ENDPOINT_TYPE', 'publicURL'),
+ help='Type of endpoint in Identity service catalog to use for '
+ 'communication with OpenStack services.'),
+ cfg.BoolOpt('insecure',
+ default=False,
+ help='Disables X.509 certificate validation when an '
+ 'SSL connection to Identity Service is established.'),
+]
+CONF.register_cli_opts(CLI_OPTS, group="service_credentials")
+
+LOG = log.getLogger(__name__)
+_DEFAULT_LOG_LEVELS = ['keystonemiddleware=WARN', 'stevedore=WARN']
+
+
+class WorkerException(Exception):
+ """Exception for errors relating to service workers."""
+
+
+def get_workers(name):
+ return 1
+
+
+def prepare_service(argv=None):
+ i18n.enable_lazy()
+ log.set_defaults(_DEFAULT_LOG_LEVELS)
+ log.register_options(CONF)
+ if argv is None:
+ argv = sys.argv
+ CONF(argv[1:], project='glance-search')
+ log.setup(cfg.CONF, 'glance-search')
+ oslo.messaging.set_transport_defaults('glance')