diff options
author | Zuul <zuul@review.opendev.org> | 2021-12-08 16:35:54 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2021-12-08 16:35:54 +0000 |
commit | c4d2f4b8c63d86233fdf863ae4c4fa21d95df65c (patch) | |
tree | f971b7e1610e6f00651846183a59ef01cec0f658 /ironic/conductor | |
parent | b37ee7c91133d4970208f14833d8f732de9b5911 (diff) | |
parent | 9a6f2d101ba734126b3e7f0d475b747ebcb84b62 (diff) | |
download | ironic-c4d2f4b8c63d86233fdf863ae4c4fa21d95df65c.tar.gz |
Merge "All-in-one Ironic service with a local RPC bus"
Diffstat (limited to 'ironic/conductor')
-rw-r--r-- | ironic/conductor/manager.py | 2 | ||||
-rw-r--r-- | ironic/conductor/rpcapi.py | 177 |
2 files changed, 122 insertions, 57 deletions
diff --git a/ironic/conductor/manager.py b/ironic/conductor/manager.py index 2797faa24..4935c5110 100644 --- a/ironic/conductor/manager.py +++ b/ironic/conductor/manager.py @@ -77,8 +77,6 @@ from ironic import objects from ironic.objects import base as objects_base from ironic.objects import fields -MANAGER_TOPIC = 'ironic.conductor_manager' - LOG = log.getLogger(__name__) METRICS = metrics_utils.get_metrics_logger(__name__) diff --git a/ironic/conductor/rpcapi.py b/ironic/conductor/rpcapi.py index 7d753b90e..6f4971be7 100644 --- a/ironic/conductor/rpcapi.py +++ b/ironic/conductor/rpcapi.py @@ -21,6 +21,7 @@ Client side of the conductor RPC API. import random from ironic_lib.json_rpc import client as json_rpc +from oslo_log import log import oslo_messaging as messaging from ironic.common import exception @@ -28,12 +29,54 @@ from ironic.common import hash_ring from ironic.common.i18n import _ from ironic.common import release_mappings as versions from ironic.common import rpc -from ironic.conductor import manager from ironic.conf import CONF from ironic.db import api as dbapi from ironic.objects import base as objects_base +LOG = log.getLogger(__name__) + + +class LocalContext: + """Context to make calls to a local conductor.""" + + __slots__ = () + + def call(self, context, rpc_call_name, **kwargs): + """Make a local conductor call.""" + if rpc.GLOBAL_MANAGER is None: + raise exception.ServiceUnavailable( + _("The built-in conductor is not available, it might have " + "crashed. Please check the logs and correct the " + "configuration, if required.")) + try: + return getattr(rpc.GLOBAL_MANAGER, rpc_call_name)(context, + **kwargs) + # FIXME(dtantsur): can we somehow avoid wrapping the exception? + except messaging.ExpectedException as exc: + exc_value, exc_tb = exc.exc_info[1:] + raise exc_value.with_traceback(exc_tb) from None + + def cast(self, context, rpc_call_name, **kwargs): + """Make a local conductor call. + + It is expected that the underlying call uses a thread to avoid + blocking the caller. + + Any exceptions are logged and ignored. + """ + try: + return self.call(context, rpc_call_name, **kwargs) + except Exception: + # In real RPC, casts are completely asynchronous and never return + # actual errors. + LOG.exception('Ignoring unhandled exception from RPC cast %s', + rpc_call_name) + + +_LOCAL_CONTEXT = LocalContext() + + class ConductorAPI(object): """Client side of the conductor RPC API. @@ -120,7 +163,7 @@ class ConductorAPI(object): super(ConductorAPI, self).__init__() self.topic = topic if self.topic is None: - self.topic = manager.MANAGER_TOPIC + self.topic = rpc.MANAGER_TOPIC serializer = objects_base.IronicObjectSerializer() release_ver = versions.RELEASE_MAPPING.get(CONF.pin_release_version) @@ -139,6 +182,30 @@ class ConductorAPI(object): # NOTE(tenbrae): this is going to be buggy self.ring_manager = hash_ring.HashRingManager() + def _prepare_call(self, topic, version=None): + """Prepare an RPC call. + + If a conductor exists in the same process, a direct function call + is used instead of real RPC. + + :param topic: RPC topic to send to. + :param version: RPC API version to require. + """ + # FIXME(dtantsur): this doesn't work with either JSON RPC or local + # conductor. Do we even need this fallback? + topic = topic or self.topic + # Normally a topic is a <topic prefix>.<hostname>, we need to extract + # the hostname to match it against the current host. + host = topic[len(self.topic) + 1:] + + if rpc.GLOBAL_MANAGER is not None and host == CONF.host: + # Short-cut to a local function call if there is a built-in + # conductor. + return _LOCAL_CONTEXT + + # Normal RPC path + return self.client.prepare(topic=topic, version=version) + def get_conductor_for(self, node): """Get the conductor which the node is mapped to. @@ -231,7 +298,7 @@ class ConductorAPI(object): :raises: NoValidDefaultForInterface if no default can be calculated for some interfaces, and explicit values must be provided. """ - cctxt = self.client.prepare(topic=topic or self.topic, version='1.36') + cctxt = self._prepare_call(topic=topic, version='1.36') return cctxt.call(context, 'create_node', node_obj=node_obj) def update_node(self, context, node_obj, topic=None, @@ -257,7 +324,7 @@ class ConductorAPI(object): for some interfaces, and explicit values must be provided. """ - cctxt = self.client.prepare(topic=topic or self.topic, version='1.1') + cctxt = self._prepare_call(topic=topic, version='1.1') return cctxt.call(context, 'update_node', node_obj=node_obj, reset_interfaces=reset_interfaces) @@ -278,7 +345,7 @@ class ConductorAPI(object): async task. """ - cctxt = self.client.prepare(topic=topic or self.topic, version='1.39') + cctxt = self._prepare_call(topic=topic, version='1.39') return cctxt.call(context, 'change_node_power_state', node_id=node_id, new_state=new_state, timeout=timeout) @@ -298,7 +365,7 @@ class ConductorAPI(object): async task. """ - cctxt = self.client.prepare(topic=topic or self.topic, version='1.55') + cctxt = self._prepare_call(topic=topic, version='1.55') return cctxt.call(context, 'change_node_boot_mode', node_id=node_id, new_state=new_state) @@ -318,7 +385,7 @@ class ConductorAPI(object): async task. """ - cctxt = self.client.prepare(topic=topic or self.topic, version='1.55') + cctxt = self._prepare_call(topic=topic, version='1.55') return cctxt.call(context, 'change_node_secure_boot', node_id=node_id, new_state=new_state) @@ -355,7 +422,7 @@ class ConductorAPI(object): or return it in the response body (False). """ - cctxt = self.client.prepare(topic=topic or self.topic, version='1.20') + cctxt = self._prepare_call(topic=topic, version='1.20') return cctxt.call(context, 'vendor_passthru', node_id=node_id, driver_method=driver_method, http_method=http_method, @@ -400,7 +467,7 @@ class ConductorAPI(object): or return it in the response body (False). """ - cctxt = self.client.prepare(topic=topic or self.topic, version='1.20') + cctxt = self._prepare_call(topic=topic, version='1.20') return cctxt.call(context, 'driver_vendor_passthru', driver_name=driver_name, driver_method=driver_method, @@ -416,7 +483,7 @@ class ConductorAPI(object): :returns: dictionary of <method name>:<method metadata> entries. """ - cctxt = self.client.prepare(topic=topic or self.topic, version='1.21') + cctxt = self._prepare_call(topic=topic, version='1.21') return cctxt.call(context, 'get_node_vendor_passthru_methods', node_id=node_id) @@ -438,7 +505,7 @@ class ConductorAPI(object): :returns: dictionary of <method name>:<method metadata> entries. """ - cctxt = self.client.prepare(topic=topic or self.topic, version='1.21') + cctxt = self._prepare_call(topic=topic, version='1.21') return cctxt.call(context, 'get_driver_vendor_passthru_methods', driver_name=driver_name) @@ -468,7 +535,7 @@ class ConductorAPI(object): version = '1.52' new_kws['deploy_steps'] = deploy_steps - cctxt = self.client.prepare(topic=topic or self.topic, version=version) + cctxt = self._prepare_call(topic=topic, version=version) return cctxt.call(context, 'do_node_deploy', node_id=node_id, rebuild=rebuild, configdrive=configdrive, **new_kws) @@ -488,7 +555,7 @@ class ConductorAPI(object): deployed state before this method is called. """ - cctxt = self.client.prepare(topic=topic or self.topic, version='1.6') + cctxt = self._prepare_call(topic=topic, version='1.6') return cctxt.call(context, 'do_node_tear_down', node_id=node_id) def do_provisioning_action(self, context, node_id, action, topic=None): @@ -506,7 +573,7 @@ class ConductorAPI(object): This encapsulates some provisioning actions in a single call. """ - cctxt = self.client.prepare(topic=topic or self.topic, version='1.23') + cctxt = self._prepare_call(topic=topic, version='1.23') return cctxt.call(context, 'do_provisioning_action', node_id=node_id, action=action) @@ -520,7 +587,7 @@ class ConductorAPI(object): :param node_id: node id or uuid. :param topic: RPC topic. Defaults to self.topic. """ - cctxt = self.client.prepare(topic=topic or self.topic, version='1.27') + cctxt = self._prepare_call(topic=topic, version='1.27') return cctxt.cast(context, 'continue_node_clean', node_id=node_id) @@ -534,7 +601,7 @@ class ConductorAPI(object): :param node_id: node id or uuid. :param topic: RPC topic. Defaults to self.topic. """ - cctxt = self.client.prepare(topic=topic or self.topic, version='1.45') + cctxt = self._prepare_call(topic=topic, version='1.45') return cctxt.cast(context, 'continue_node_deploy', node_id=node_id) @@ -548,7 +615,7 @@ class ConductorAPI(object): interface validation. """ - cctxt = self.client.prepare(topic=topic or self.topic, version='1.5') + cctxt = self._prepare_call(topic=topic, version='1.5') return cctxt.call(context, 'validate_driver_interfaces', node_id=node_id) @@ -564,7 +631,7 @@ class ConductorAPI(object): :raises: InvalidState if the node is in the wrong provision state to perform deletion. """ - cctxt = self.client.prepare(topic=topic or self.topic, version='1.9') + cctxt = self._prepare_call(topic=topic, version='1.9') return cctxt.call(context, 'destroy_node', node_id=node_id) def get_console_information(self, context, node_id, topic=None): @@ -578,7 +645,7 @@ class ConductorAPI(object): :raises: InvalidParameterValue when the wrong driver info is specified. :raises: MissingParameterValue if a required parameter is missing """ - cctxt = self.client.prepare(topic=topic or self.topic, version='1.11') + cctxt = self._prepare_call(topic=topic, version='1.11') return cctxt.call(context, 'get_console_information', node_id=node_id) def set_console_mode(self, context, node_id, enabled, topic=None): @@ -596,7 +663,7 @@ class ConductorAPI(object): :raises: NoFreeConductorWorker when there is no free worker to start async task. """ - cctxt = self.client.prepare(topic=topic or self.topic, version='1.11') + cctxt = self._prepare_call(topic=topic, version='1.11') return cctxt.call(context, 'set_console_mode', node_id=node_id, enabled=enabled) @@ -612,7 +679,7 @@ class ConductorAPI(object): :param topic: RPC topic. Defaults to self.topic. :returns: created port object. """ - cctxt = self.client.prepare(topic=topic or self.topic, version='1.41') + cctxt = self._prepare_call(topic=topic, version='1.41') return cctxt.call(context, 'create_port', port_obj=port_obj) def update_port(self, context, port_obj, topic=None): @@ -628,7 +695,7 @@ class ConductorAPI(object): :returns: updated port object, including all fields. """ - cctxt = self.client.prepare(topic=topic or self.topic, version='1.13') + cctxt = self._prepare_call(topic=topic, version='1.13') return cctxt.call(context, 'update_port', port_obj=port_obj) def update_portgroup(self, context, portgroup_obj, topic=None): @@ -645,7 +712,7 @@ class ConductorAPI(object): :returns: updated portgroup object, including all fields. """ - cctxt = self.client.prepare(topic=topic or self.topic, version='1.33') + cctxt = self._prepare_call(topic=topic, version='1.33') return cctxt.call(context, 'update_portgroup', portgroup_obj=portgroup_obj) @@ -660,7 +727,7 @@ class ConductorAPI(object): not exist. :raises: PortgroupNotEmpty if portgroup is not empty """ - cctxt = self.client.prepare(topic=topic or self.topic, version='1.33') + cctxt = self._prepare_call(topic=topic, version='1.33') return cctxt.call(context, 'destroy_portgroup', portgroup=portgroup) def get_driver_properties(self, context, driver_name, topic=None): @@ -674,7 +741,7 @@ class ConductorAPI(object): :raises: DriverNotFound. """ - cctxt = self.client.prepare(topic=topic or self.topic, version='1.16') + cctxt = self._prepare_call(topic=topic, version='1.16') return cctxt.call(context, 'get_driver_properties', driver_name=driver_name) @@ -699,7 +766,7 @@ class ConductorAPI(object): specified or an invalid boot device is specified. :raises: MissingParameterValue if missing supplied info. """ - cctxt = self.client.prepare(topic=topic or self.topic, version='1.17') + cctxt = self._prepare_call(topic=topic, version='1.17') return cctxt.call(context, 'set_boot_device', node_id=node_id, device=device, persistent=persistent) @@ -725,7 +792,7 @@ class ConductorAPI(object): future boots or not, None if it is unknown. """ - cctxt = self.client.prepare(topic=topic or self.topic, version='1.17') + cctxt = self._prepare_call(topic=topic, version='1.17') return cctxt.call(context, 'get_boot_device', node_id=node_id) def inject_nmi(self, context, node_id, topic=None): @@ -745,7 +812,7 @@ class ConductorAPI(object): :raises: MissingParameterValue if missing supplied info. """ - cctxt = self.client.prepare(topic=topic or self.topic, version='1.40') + cctxt = self._prepare_call(topic=topic, version='1.40') return cctxt.call(context, 'inject_nmi', node_id=node_id) def get_supported_boot_devices(self, context, node_id, topic=None): @@ -766,7 +833,7 @@ class ConductorAPI(object): in :mod:`ironic.common.boot_devices`. """ - cctxt = self.client.prepare(topic=topic or self.topic, version='1.17') + cctxt = self._prepare_call(topic=topic, version='1.17') return cctxt.call(context, 'get_supported_boot_devices', node_id=node_id) @@ -791,7 +858,7 @@ class ConductorAPI(object): :raises: MissingParameterValue if missing supplied info. """ - cctxt = self.client.prepare(topic=topic or self.topic, version='1.50') + cctxt = self._prepare_call(topic=topic, version='1.50') return cctxt.call(context, 'set_indicator_state', node_id=node_id, component=component, indicator=indicator, state=state) @@ -817,7 +884,7 @@ class ConductorAPI(object): mod:`ironic.common.indicator_states`. """ - cctxt = self.client.prepare(topic=topic or self.topic, version='1.50') + cctxt = self._prepare_call(topic=topic, version='1.50') return cctxt.call(context, 'get_indicator_state', node_id=node_id, component=component, indicator=indicator) @@ -849,7 +916,7 @@ class ConductorAPI(object): } """ - cctxt = self.client.prepare(topic=topic or self.topic, version='1.50') + cctxt = self._prepare_call(topic=topic, version='1.50') return cctxt.call(context, 'get_supported_indicators', node_id=node_id, component=component) @@ -869,7 +936,7 @@ class ConductorAPI(object): action to do in the current state. """ - cctxt = self.client.prepare(topic=topic or self.topic, version='1.24') + cctxt = self._prepare_call(topic=topic, version='1.24') return cctxt.call(context, 'inspect_hardware', node_id=node_id) def destroy_port(self, context, port, topic=None): @@ -882,7 +949,7 @@ class ConductorAPI(object): :raises: NodeNotFound if the node associated with the port does not exist. """ - cctxt = self.client.prepare(topic=topic or self.topic, version='1.25') + cctxt = self._prepare_call(topic=topic, version='1.25') return cctxt.call(context, 'destroy_port', port=port) def set_target_raid_config(self, context, node_id, target_raid_config, @@ -904,7 +971,7 @@ class ConductorAPI(object): missing. :raises: NodeLocked if node is locked by another conductor. """ - cctxt = self.client.prepare(topic=topic or self.topic, version='1.30') + cctxt = self._prepare_call(topic=topic, version='1.30') return cctxt.call(context, 'set_target_raid_config', node_id=node_id, target_raid_config=target_raid_config) @@ -929,7 +996,7 @@ class ConductorAPI(object): :returns: A dictionary containing the properties that can be mentioned for logical disks and a textual description for them. """ - cctxt = self.client.prepare(topic=topic or self.topic, version='1.30') + cctxt = self._prepare_call(topic=topic, version='1.30') return cctxt.call(context, 'get_raid_logical_disk_properties', driver_name=driver_name) @@ -957,7 +1024,7 @@ class ConductorAPI(object): params['disable_ramdisk'] = disable_ramdisk version = '1.53' - cctxt = self.client.prepare(topic=topic or self.topic, version=version) + cctxt = self._prepare_call(topic=topic, version=version) return cctxt.call(context, 'do_node_clean', node_id=node_id, clean_steps=clean_steps, **params) @@ -993,7 +1060,7 @@ class ConductorAPI(object): version = '1.54' new_kws['agent_status'] = agent_status new_kws['agent_status_message'] = agent_status_message - cctxt = self.client.prepare(topic=topic or self.topic, version=version) + cctxt = self._prepare_call(topic=topic, version=version) return cctxt.call(context, 'heartbeat', node_id=node_id, callback_url=callback_url, **new_kws) @@ -1019,7 +1086,7 @@ class ConductorAPI(object): raise NotImplementedError(_('Incompatible conductor version - ' 'please upgrade ironic-conductor ' 'first')) - cctxt = self.client.prepare(topic=self.topic, version='1.31') + cctxt = self._prepare_call(topic=self.topic, version='1.31') return cctxt.call(context, 'object_class_action_versions', objname=objname, objmethod=objmethod, object_versions=object_versions, @@ -1045,7 +1112,7 @@ class ConductorAPI(object): raise NotImplementedError(_('Incompatible conductor version - ' 'please upgrade ironic-conductor ' 'first')) - cctxt = self.client.prepare(topic=self.topic, version='1.31') + cctxt = self._prepare_call(topic=self.topic, version='1.31') return cctxt.call(context, 'object_action', objinst=objinst, objmethod=objmethod, args=args, kwargs=kwargs) @@ -1070,7 +1137,7 @@ class ConductorAPI(object): raise NotImplementedError(_('Incompatible conductor version - ' 'please upgrade ironic-conductor ' 'first')) - cctxt = self.client.prepare(topic=self.topic, version='1.31') + cctxt = self._prepare_call(topic=self.topic, version='1.31') return cctxt.call(context, 'object_backport_versions', objinst=objinst, object_versions=object_versions) @@ -1089,7 +1156,7 @@ class ConductorAPI(object): :raises: VolumeConnectorNotFound if the volume connector cannot be found """ - cctxt = self.client.prepare(topic=topic or self.topic, version='1.35') + cctxt = self._prepare_call(topic=topic, version='1.35') return cctxt.call(context, 'destroy_volume_connector', connector=connector) @@ -1116,7 +1183,7 @@ class ConductorAPI(object): :returns: updated volume connector object, including all fields. """ - cctxt = self.client.prepare(topic=topic or self.topic, version='1.35') + cctxt = self._prepare_call(topic=topic, version='1.35') return cctxt.call(context, 'update_volume_connector', connector=connector) @@ -1131,7 +1198,7 @@ class ConductorAPI(object): not exist :raises: VolumeTargetNotFound if the volume target cannot be found """ - cctxt = self.client.prepare(topic=topic or self.topic, version='1.37') + cctxt = self._prepare_call(topic=topic, version='1.37') return cctxt.call(context, 'destroy_volume_target', target=target) @@ -1156,7 +1223,7 @@ class ConductorAPI(object): :returns: updated volume target object, including all fields """ - cctxt = self.client.prepare(topic=topic or self.topic, version='1.37') + cctxt = self._prepare_call(topic=topic, version='1.37') return cctxt.call(context, 'update_volume_target', target=target) @@ -1174,7 +1241,7 @@ class ConductorAPI(object): :raises: InvalidParameterValue, if a parameter that's required for VIF attach is wrong/missing. """ - cctxt = self.client.prepare(topic=topic or self.topic, version='1.38') + cctxt = self._prepare_call(topic=topic, version='1.38') return cctxt.call(context, 'vif_attach', node_id=node_id, vif_info=vif_info) @@ -1190,7 +1257,7 @@ class ConductorAPI(object): :raises: InvalidParameterValue, if a parameter that's required for VIF detach is wrong/missing. """ - cctxt = self.client.prepare(topic=topic or self.topic, version='1.38') + cctxt = self._prepare_call(topic=topic, version='1.38') return cctxt.call(context, 'vif_detach', node_id=node_id, vif_id=vif_id) @@ -1206,7 +1273,7 @@ class ConductorAPI(object): :raises: InvalidParameterValue, if a parameter that's required for VIF list is wrong/missing. """ - cctxt = self.client.prepare(topic=topic or self.topic, version='1.38') + cctxt = self._prepare_call(topic=topic, version='1.38') return cctxt.call(context, 'vif_list', node_id=node_id) def do_node_rescue(self, context, node_id, rescue_password, topic=None): @@ -1225,7 +1292,7 @@ class ConductorAPI(object): state before this method is called. """ - cctxt = self.client.prepare(topic=topic or self.topic, version='1.43') + cctxt = self._prepare_call(topic=topic, version='1.43') return cctxt.call(context, 'do_node_rescue', node_id=node_id, rescue_password=rescue_password) @@ -1243,7 +1310,7 @@ class ConductorAPI(object): state before this method is called. """ - cctxt = self.client.prepare(topic=topic or self.topic, version='1.43') + cctxt = self._prepare_call(topic=topic, version='1.43') return cctxt.call(context, 'do_node_unrescue', node_id=node_id) def add_node_traits(self, context, node_id, traits, replace=False, @@ -1260,7 +1327,7 @@ class ConductorAPI(object): :raises: NodeLocked if node is locked by another conductor. :raises: NodeNotFound if the node does not exist. """ - cctxt = self.client.prepare(topic=topic or self.topic, version='1.44') + cctxt = self._prepare_call(topic=topic, version='1.44') return cctxt.call(context, 'add_node_traits', node_id=node_id, traits=traits, replace=replace) @@ -1276,7 +1343,7 @@ class ConductorAPI(object): :raises: NodeNotFound if the node does not exist. :raises: NodeTraitNotFound if one of the traits is not found. """ - cctxt = self.client.prepare(topic=topic or self.topic, version='1.44') + cctxt = self._prepare_call(topic=topic, version='1.44') return cctxt.call(context, 'remove_node_traits', node_id=node_id, traits=traits) @@ -1287,7 +1354,7 @@ class ConductorAPI(object): :param allocation: an allocation object. :param topic: RPC topic. Defaults to self.topic. """ - cctxt = self.client.prepare(topic=topic or self.topic, version='1.48') + cctxt = self._prepare_call(topic=topic, version='1.48') return cctxt.call(context, 'create_allocation', allocation=allocation) def destroy_allocation(self, context, allocation, topic=None): @@ -1299,7 +1366,7 @@ class ConductorAPI(object): :raises: InvalidState if the associated node is in the wrong provision state to perform deallocation. """ - cctxt = self.client.prepare(topic=topic or self.topic, version='1.48') + cctxt = self._prepare_call(topic=topic, version='1.48') return cctxt.call(context, 'destroy_allocation', allocation=allocation) def get_node_with_token(self, context, node_id, topic=None): @@ -1312,5 +1379,5 @@ class ConductorAPI(object): :returns: A Node object with agent token. """ - cctxt = self.client.prepare(topic=topic or self.topic, version='1.49') + cctxt = self._prepare_call(topic=topic, version='1.49') return cctxt.call(context, 'get_node_with_token', node_id=node_id) |