summaryrefslogtreecommitdiff
path: root/ironic/conductor
diff options
context:
space:
mode:
authorZuul <zuul@review.opendev.org>2021-12-08 16:35:54 +0000
committerGerrit Code Review <review@openstack.org>2021-12-08 16:35:54 +0000
commitc4d2f4b8c63d86233fdf863ae4c4fa21d95df65c (patch)
treef971b7e1610e6f00651846183a59ef01cec0f658 /ironic/conductor
parentb37ee7c91133d4970208f14833d8f732de9b5911 (diff)
parent9a6f2d101ba734126b3e7f0d475b747ebcb84b62 (diff)
downloadironic-c4d2f4b8c63d86233fdf863ae4c4fa21d95df65c.tar.gz
Merge "All-in-one Ironic service with a local RPC bus"
Diffstat (limited to 'ironic/conductor')
-rw-r--r--ironic/conductor/manager.py2
-rw-r--r--ironic/conductor/rpcapi.py177
2 files changed, 122 insertions, 57 deletions
diff --git a/ironic/conductor/manager.py b/ironic/conductor/manager.py
index 2797faa24..4935c5110 100644
--- a/ironic/conductor/manager.py
+++ b/ironic/conductor/manager.py
@@ -77,8 +77,6 @@ from ironic import objects
from ironic.objects import base as objects_base
from ironic.objects import fields
-MANAGER_TOPIC = 'ironic.conductor_manager'
-
LOG = log.getLogger(__name__)
METRICS = metrics_utils.get_metrics_logger(__name__)
diff --git a/ironic/conductor/rpcapi.py b/ironic/conductor/rpcapi.py
index 7d753b90e..6f4971be7 100644
--- a/ironic/conductor/rpcapi.py
+++ b/ironic/conductor/rpcapi.py
@@ -21,6 +21,7 @@ Client side of the conductor RPC API.
import random
from ironic_lib.json_rpc import client as json_rpc
+from oslo_log import log
import oslo_messaging as messaging
from ironic.common import exception
@@ -28,12 +29,54 @@ from ironic.common import hash_ring
from ironic.common.i18n import _
from ironic.common import release_mappings as versions
from ironic.common import rpc
-from ironic.conductor import manager
from ironic.conf import CONF
from ironic.db import api as dbapi
from ironic.objects import base as objects_base
+LOG = log.getLogger(__name__)
+
+
+class LocalContext:
+ """Context to make calls to a local conductor."""
+
+ __slots__ = ()
+
+ def call(self, context, rpc_call_name, **kwargs):
+ """Make a local conductor call."""
+ if rpc.GLOBAL_MANAGER is None:
+ raise exception.ServiceUnavailable(
+ _("The built-in conductor is not available, it might have "
+ "crashed. Please check the logs and correct the "
+ "configuration, if required."))
+ try:
+ return getattr(rpc.GLOBAL_MANAGER, rpc_call_name)(context,
+ **kwargs)
+ # FIXME(dtantsur): can we somehow avoid wrapping the exception?
+ except messaging.ExpectedException as exc:
+ exc_value, exc_tb = exc.exc_info[1:]
+ raise exc_value.with_traceback(exc_tb) from None
+
+ def cast(self, context, rpc_call_name, **kwargs):
+ """Make a local conductor call.
+
+ It is expected that the underlying call uses a thread to avoid
+ blocking the caller.
+
+ Any exceptions are logged and ignored.
+ """
+ try:
+ return self.call(context, rpc_call_name, **kwargs)
+ except Exception:
+ # In real RPC, casts are completely asynchronous and never return
+ # actual errors.
+ LOG.exception('Ignoring unhandled exception from RPC cast %s',
+ rpc_call_name)
+
+
+_LOCAL_CONTEXT = LocalContext()
+
+
class ConductorAPI(object):
"""Client side of the conductor RPC API.
@@ -120,7 +163,7 @@ class ConductorAPI(object):
super(ConductorAPI, self).__init__()
self.topic = topic
if self.topic is None:
- self.topic = manager.MANAGER_TOPIC
+ self.topic = rpc.MANAGER_TOPIC
serializer = objects_base.IronicObjectSerializer()
release_ver = versions.RELEASE_MAPPING.get(CONF.pin_release_version)
@@ -139,6 +182,30 @@ class ConductorAPI(object):
# NOTE(tenbrae): this is going to be buggy
self.ring_manager = hash_ring.HashRingManager()
+ def _prepare_call(self, topic, version=None):
+ """Prepare an RPC call.
+
+ If a conductor exists in the same process, a direct function call
+ is used instead of real RPC.
+
+ :param topic: RPC topic to send to.
+ :param version: RPC API version to require.
+ """
+ # FIXME(dtantsur): this doesn't work with either JSON RPC or local
+ # conductor. Do we even need this fallback?
+ topic = topic or self.topic
+ # Normally a topic is a <topic prefix>.<hostname>, we need to extract
+ # the hostname to match it against the current host.
+ host = topic[len(self.topic) + 1:]
+
+ if rpc.GLOBAL_MANAGER is not None and host == CONF.host:
+ # Short-cut to a local function call if there is a built-in
+ # conductor.
+ return _LOCAL_CONTEXT
+
+ # Normal RPC path
+ return self.client.prepare(topic=topic, version=version)
+
def get_conductor_for(self, node):
"""Get the conductor which the node is mapped to.
@@ -231,7 +298,7 @@ class ConductorAPI(object):
:raises: NoValidDefaultForInterface if no default can be calculated
for some interfaces, and explicit values must be provided.
"""
- cctxt = self.client.prepare(topic=topic or self.topic, version='1.36')
+ cctxt = self._prepare_call(topic=topic, version='1.36')
return cctxt.call(context, 'create_node', node_obj=node_obj)
def update_node(self, context, node_obj, topic=None,
@@ -257,7 +324,7 @@ class ConductorAPI(object):
for some interfaces, and explicit values must be provided.
"""
- cctxt = self.client.prepare(topic=topic or self.topic, version='1.1')
+ cctxt = self._prepare_call(topic=topic, version='1.1')
return cctxt.call(context, 'update_node', node_obj=node_obj,
reset_interfaces=reset_interfaces)
@@ -278,7 +345,7 @@ class ConductorAPI(object):
async task.
"""
- cctxt = self.client.prepare(topic=topic or self.topic, version='1.39')
+ cctxt = self._prepare_call(topic=topic, version='1.39')
return cctxt.call(context, 'change_node_power_state', node_id=node_id,
new_state=new_state, timeout=timeout)
@@ -298,7 +365,7 @@ class ConductorAPI(object):
async task.
"""
- cctxt = self.client.prepare(topic=topic or self.topic, version='1.55')
+ cctxt = self._prepare_call(topic=topic, version='1.55')
return cctxt.call(context, 'change_node_boot_mode', node_id=node_id,
new_state=new_state)
@@ -318,7 +385,7 @@ class ConductorAPI(object):
async task.
"""
- cctxt = self.client.prepare(topic=topic or self.topic, version='1.55')
+ cctxt = self._prepare_call(topic=topic, version='1.55')
return cctxt.call(context, 'change_node_secure_boot', node_id=node_id,
new_state=new_state)
@@ -355,7 +422,7 @@ class ConductorAPI(object):
or return it in the response body (False).
"""
- cctxt = self.client.prepare(topic=topic or self.topic, version='1.20')
+ cctxt = self._prepare_call(topic=topic, version='1.20')
return cctxt.call(context, 'vendor_passthru', node_id=node_id,
driver_method=driver_method,
http_method=http_method,
@@ -400,7 +467,7 @@ class ConductorAPI(object):
or return it in the response body (False).
"""
- cctxt = self.client.prepare(topic=topic or self.topic, version='1.20')
+ cctxt = self._prepare_call(topic=topic, version='1.20')
return cctxt.call(context, 'driver_vendor_passthru',
driver_name=driver_name,
driver_method=driver_method,
@@ -416,7 +483,7 @@ class ConductorAPI(object):
:returns: dictionary of <method name>:<method metadata> entries.
"""
- cctxt = self.client.prepare(topic=topic or self.topic, version='1.21')
+ cctxt = self._prepare_call(topic=topic, version='1.21')
return cctxt.call(context, 'get_node_vendor_passthru_methods',
node_id=node_id)
@@ -438,7 +505,7 @@ class ConductorAPI(object):
:returns: dictionary of <method name>:<method metadata> entries.
"""
- cctxt = self.client.prepare(topic=topic or self.topic, version='1.21')
+ cctxt = self._prepare_call(topic=topic, version='1.21')
return cctxt.call(context, 'get_driver_vendor_passthru_methods',
driver_name=driver_name)
@@ -468,7 +535,7 @@ class ConductorAPI(object):
version = '1.52'
new_kws['deploy_steps'] = deploy_steps
- cctxt = self.client.prepare(topic=topic or self.topic, version=version)
+ cctxt = self._prepare_call(topic=topic, version=version)
return cctxt.call(context, 'do_node_deploy', node_id=node_id,
rebuild=rebuild, configdrive=configdrive, **new_kws)
@@ -488,7 +555,7 @@ class ConductorAPI(object):
deployed state before this method is called.
"""
- cctxt = self.client.prepare(topic=topic or self.topic, version='1.6')
+ cctxt = self._prepare_call(topic=topic, version='1.6')
return cctxt.call(context, 'do_node_tear_down', node_id=node_id)
def do_provisioning_action(self, context, node_id, action, topic=None):
@@ -506,7 +573,7 @@ class ConductorAPI(object):
This encapsulates some provisioning actions in a single call.
"""
- cctxt = self.client.prepare(topic=topic or self.topic, version='1.23')
+ cctxt = self._prepare_call(topic=topic, version='1.23')
return cctxt.call(context, 'do_provisioning_action',
node_id=node_id, action=action)
@@ -520,7 +587,7 @@ class ConductorAPI(object):
:param node_id: node id or uuid.
:param topic: RPC topic. Defaults to self.topic.
"""
- cctxt = self.client.prepare(topic=topic or self.topic, version='1.27')
+ cctxt = self._prepare_call(topic=topic, version='1.27')
return cctxt.cast(context, 'continue_node_clean',
node_id=node_id)
@@ -534,7 +601,7 @@ class ConductorAPI(object):
:param node_id: node id or uuid.
:param topic: RPC topic. Defaults to self.topic.
"""
- cctxt = self.client.prepare(topic=topic or self.topic, version='1.45')
+ cctxt = self._prepare_call(topic=topic, version='1.45')
return cctxt.cast(context, 'continue_node_deploy',
node_id=node_id)
@@ -548,7 +615,7 @@ class ConductorAPI(object):
interface validation.
"""
- cctxt = self.client.prepare(topic=topic or self.topic, version='1.5')
+ cctxt = self._prepare_call(topic=topic, version='1.5')
return cctxt.call(context, 'validate_driver_interfaces',
node_id=node_id)
@@ -564,7 +631,7 @@ class ConductorAPI(object):
:raises: InvalidState if the node is in the wrong provision
state to perform deletion.
"""
- cctxt = self.client.prepare(topic=topic or self.topic, version='1.9')
+ cctxt = self._prepare_call(topic=topic, version='1.9')
return cctxt.call(context, 'destroy_node', node_id=node_id)
def get_console_information(self, context, node_id, topic=None):
@@ -578,7 +645,7 @@ class ConductorAPI(object):
:raises: InvalidParameterValue when the wrong driver info is specified.
:raises: MissingParameterValue if a required parameter is missing
"""
- cctxt = self.client.prepare(topic=topic or self.topic, version='1.11')
+ cctxt = self._prepare_call(topic=topic, version='1.11')
return cctxt.call(context, 'get_console_information', node_id=node_id)
def set_console_mode(self, context, node_id, enabled, topic=None):
@@ -596,7 +663,7 @@ class ConductorAPI(object):
:raises: NoFreeConductorWorker when there is no free worker to start
async task.
"""
- cctxt = self.client.prepare(topic=topic or self.topic, version='1.11')
+ cctxt = self._prepare_call(topic=topic, version='1.11')
return cctxt.call(context, 'set_console_mode', node_id=node_id,
enabled=enabled)
@@ -612,7 +679,7 @@ class ConductorAPI(object):
:param topic: RPC topic. Defaults to self.topic.
:returns: created port object.
"""
- cctxt = self.client.prepare(topic=topic or self.topic, version='1.41')
+ cctxt = self._prepare_call(topic=topic, version='1.41')
return cctxt.call(context, 'create_port', port_obj=port_obj)
def update_port(self, context, port_obj, topic=None):
@@ -628,7 +695,7 @@ class ConductorAPI(object):
:returns: updated port object, including all fields.
"""
- cctxt = self.client.prepare(topic=topic or self.topic, version='1.13')
+ cctxt = self._prepare_call(topic=topic, version='1.13')
return cctxt.call(context, 'update_port', port_obj=port_obj)
def update_portgroup(self, context, portgroup_obj, topic=None):
@@ -645,7 +712,7 @@ class ConductorAPI(object):
:returns: updated portgroup object, including all fields.
"""
- cctxt = self.client.prepare(topic=topic or self.topic, version='1.33')
+ cctxt = self._prepare_call(topic=topic, version='1.33')
return cctxt.call(context, 'update_portgroup',
portgroup_obj=portgroup_obj)
@@ -660,7 +727,7 @@ class ConductorAPI(object):
not exist.
:raises: PortgroupNotEmpty if portgroup is not empty
"""
- cctxt = self.client.prepare(topic=topic or self.topic, version='1.33')
+ cctxt = self._prepare_call(topic=topic, version='1.33')
return cctxt.call(context, 'destroy_portgroup', portgroup=portgroup)
def get_driver_properties(self, context, driver_name, topic=None):
@@ -674,7 +741,7 @@ class ConductorAPI(object):
:raises: DriverNotFound.
"""
- cctxt = self.client.prepare(topic=topic or self.topic, version='1.16')
+ cctxt = self._prepare_call(topic=topic, version='1.16')
return cctxt.call(context, 'get_driver_properties',
driver_name=driver_name)
@@ -699,7 +766,7 @@ class ConductorAPI(object):
specified or an invalid boot device is specified.
:raises: MissingParameterValue if missing supplied info.
"""
- cctxt = self.client.prepare(topic=topic or self.topic, version='1.17')
+ cctxt = self._prepare_call(topic=topic, version='1.17')
return cctxt.call(context, 'set_boot_device', node_id=node_id,
device=device, persistent=persistent)
@@ -725,7 +792,7 @@ class ConductorAPI(object):
future boots or not, None if it is unknown.
"""
- cctxt = self.client.prepare(topic=topic or self.topic, version='1.17')
+ cctxt = self._prepare_call(topic=topic, version='1.17')
return cctxt.call(context, 'get_boot_device', node_id=node_id)
def inject_nmi(self, context, node_id, topic=None):
@@ -745,7 +812,7 @@ class ConductorAPI(object):
:raises: MissingParameterValue if missing supplied info.
"""
- cctxt = self.client.prepare(topic=topic or self.topic, version='1.40')
+ cctxt = self._prepare_call(topic=topic, version='1.40')
return cctxt.call(context, 'inject_nmi', node_id=node_id)
def get_supported_boot_devices(self, context, node_id, topic=None):
@@ -766,7 +833,7 @@ class ConductorAPI(object):
in :mod:`ironic.common.boot_devices`.
"""
- cctxt = self.client.prepare(topic=topic or self.topic, version='1.17')
+ cctxt = self._prepare_call(topic=topic, version='1.17')
return cctxt.call(context, 'get_supported_boot_devices',
node_id=node_id)
@@ -791,7 +858,7 @@ class ConductorAPI(object):
:raises: MissingParameterValue if missing supplied info.
"""
- cctxt = self.client.prepare(topic=topic or self.topic, version='1.50')
+ cctxt = self._prepare_call(topic=topic, version='1.50')
return cctxt.call(context, 'set_indicator_state', node_id=node_id,
component=component, indicator=indicator,
state=state)
@@ -817,7 +884,7 @@ class ConductorAPI(object):
mod:`ironic.common.indicator_states`.
"""
- cctxt = self.client.prepare(topic=topic or self.topic, version='1.50')
+ cctxt = self._prepare_call(topic=topic, version='1.50')
return cctxt.call(context, 'get_indicator_state', node_id=node_id,
component=component, indicator=indicator)
@@ -849,7 +916,7 @@ class ConductorAPI(object):
}
"""
- cctxt = self.client.prepare(topic=topic or self.topic, version='1.50')
+ cctxt = self._prepare_call(topic=topic, version='1.50')
return cctxt.call(context, 'get_supported_indicators', node_id=node_id,
component=component)
@@ -869,7 +936,7 @@ class ConductorAPI(object):
action to do in the current state.
"""
- cctxt = self.client.prepare(topic=topic or self.topic, version='1.24')
+ cctxt = self._prepare_call(topic=topic, version='1.24')
return cctxt.call(context, 'inspect_hardware', node_id=node_id)
def destroy_port(self, context, port, topic=None):
@@ -882,7 +949,7 @@ class ConductorAPI(object):
:raises: NodeNotFound if the node associated with the port does not
exist.
"""
- cctxt = self.client.prepare(topic=topic or self.topic, version='1.25')
+ cctxt = self._prepare_call(topic=topic, version='1.25')
return cctxt.call(context, 'destroy_port', port=port)
def set_target_raid_config(self, context, node_id, target_raid_config,
@@ -904,7 +971,7 @@ class ConductorAPI(object):
missing.
:raises: NodeLocked if node is locked by another conductor.
"""
- cctxt = self.client.prepare(topic=topic or self.topic, version='1.30')
+ cctxt = self._prepare_call(topic=topic, version='1.30')
return cctxt.call(context, 'set_target_raid_config',
node_id=node_id,
target_raid_config=target_raid_config)
@@ -929,7 +996,7 @@ class ConductorAPI(object):
:returns: A dictionary containing the properties that can be mentioned
for logical disks and a textual description for them.
"""
- cctxt = self.client.prepare(topic=topic or self.topic, version='1.30')
+ cctxt = self._prepare_call(topic=topic, version='1.30')
return cctxt.call(context, 'get_raid_logical_disk_properties',
driver_name=driver_name)
@@ -957,7 +1024,7 @@ class ConductorAPI(object):
params['disable_ramdisk'] = disable_ramdisk
version = '1.53'
- cctxt = self.client.prepare(topic=topic or self.topic, version=version)
+ cctxt = self._prepare_call(topic=topic, version=version)
return cctxt.call(context, 'do_node_clean',
node_id=node_id, clean_steps=clean_steps, **params)
@@ -993,7 +1060,7 @@ class ConductorAPI(object):
version = '1.54'
new_kws['agent_status'] = agent_status
new_kws['agent_status_message'] = agent_status_message
- cctxt = self.client.prepare(topic=topic or self.topic, version=version)
+ cctxt = self._prepare_call(topic=topic, version=version)
return cctxt.call(context, 'heartbeat', node_id=node_id,
callback_url=callback_url, **new_kws)
@@ -1019,7 +1086,7 @@ class ConductorAPI(object):
raise NotImplementedError(_('Incompatible conductor version - '
'please upgrade ironic-conductor '
'first'))
- cctxt = self.client.prepare(topic=self.topic, version='1.31')
+ cctxt = self._prepare_call(topic=self.topic, version='1.31')
return cctxt.call(context, 'object_class_action_versions',
objname=objname, objmethod=objmethod,
object_versions=object_versions,
@@ -1045,7 +1112,7 @@ class ConductorAPI(object):
raise NotImplementedError(_('Incompatible conductor version - '
'please upgrade ironic-conductor '
'first'))
- cctxt = self.client.prepare(topic=self.topic, version='1.31')
+ cctxt = self._prepare_call(topic=self.topic, version='1.31')
return cctxt.call(context, 'object_action', objinst=objinst,
objmethod=objmethod, args=args, kwargs=kwargs)
@@ -1070,7 +1137,7 @@ class ConductorAPI(object):
raise NotImplementedError(_('Incompatible conductor version - '
'please upgrade ironic-conductor '
'first'))
- cctxt = self.client.prepare(topic=self.topic, version='1.31')
+ cctxt = self._prepare_call(topic=self.topic, version='1.31')
return cctxt.call(context, 'object_backport_versions', objinst=objinst,
object_versions=object_versions)
@@ -1089,7 +1156,7 @@ class ConductorAPI(object):
:raises: VolumeConnectorNotFound if the volume connector cannot be
found
"""
- cctxt = self.client.prepare(topic=topic or self.topic, version='1.35')
+ cctxt = self._prepare_call(topic=topic, version='1.35')
return cctxt.call(context, 'destroy_volume_connector',
connector=connector)
@@ -1116,7 +1183,7 @@ class ConductorAPI(object):
:returns: updated volume connector object, including all fields.
"""
- cctxt = self.client.prepare(topic=topic or self.topic, version='1.35')
+ cctxt = self._prepare_call(topic=topic, version='1.35')
return cctxt.call(context, 'update_volume_connector',
connector=connector)
@@ -1131,7 +1198,7 @@ class ConductorAPI(object):
not exist
:raises: VolumeTargetNotFound if the volume target cannot be found
"""
- cctxt = self.client.prepare(topic=topic or self.topic, version='1.37')
+ cctxt = self._prepare_call(topic=topic, version='1.37')
return cctxt.call(context, 'destroy_volume_target',
target=target)
@@ -1156,7 +1223,7 @@ class ConductorAPI(object):
:returns: updated volume target object, including all fields
"""
- cctxt = self.client.prepare(topic=topic or self.topic, version='1.37')
+ cctxt = self._prepare_call(topic=topic, version='1.37')
return cctxt.call(context, 'update_volume_target',
target=target)
@@ -1174,7 +1241,7 @@ class ConductorAPI(object):
:raises: InvalidParameterValue, if a parameter that's required for
VIF attach is wrong/missing.
"""
- cctxt = self.client.prepare(topic=topic or self.topic, version='1.38')
+ cctxt = self._prepare_call(topic=topic, version='1.38')
return cctxt.call(context, 'vif_attach', node_id=node_id,
vif_info=vif_info)
@@ -1190,7 +1257,7 @@ class ConductorAPI(object):
:raises: InvalidParameterValue, if a parameter that's required for
VIF detach is wrong/missing.
"""
- cctxt = self.client.prepare(topic=topic or self.topic, version='1.38')
+ cctxt = self._prepare_call(topic=topic, version='1.38')
return cctxt.call(context, 'vif_detach', node_id=node_id,
vif_id=vif_id)
@@ -1206,7 +1273,7 @@ class ConductorAPI(object):
:raises: InvalidParameterValue, if a parameter that's required for
VIF list is wrong/missing.
"""
- cctxt = self.client.prepare(topic=topic or self.topic, version='1.38')
+ cctxt = self._prepare_call(topic=topic, version='1.38')
return cctxt.call(context, 'vif_list', node_id=node_id)
def do_node_rescue(self, context, node_id, rescue_password, topic=None):
@@ -1225,7 +1292,7 @@ class ConductorAPI(object):
state before this method is called.
"""
- cctxt = self.client.prepare(topic=topic or self.topic, version='1.43')
+ cctxt = self._prepare_call(topic=topic, version='1.43')
return cctxt.call(context, 'do_node_rescue', node_id=node_id,
rescue_password=rescue_password)
@@ -1243,7 +1310,7 @@ class ConductorAPI(object):
state before this method is called.
"""
- cctxt = self.client.prepare(topic=topic or self.topic, version='1.43')
+ cctxt = self._prepare_call(topic=topic, version='1.43')
return cctxt.call(context, 'do_node_unrescue', node_id=node_id)
def add_node_traits(self, context, node_id, traits, replace=False,
@@ -1260,7 +1327,7 @@ class ConductorAPI(object):
:raises: NodeLocked if node is locked by another conductor.
:raises: NodeNotFound if the node does not exist.
"""
- cctxt = self.client.prepare(topic=topic or self.topic, version='1.44')
+ cctxt = self._prepare_call(topic=topic, version='1.44')
return cctxt.call(context, 'add_node_traits', node_id=node_id,
traits=traits, replace=replace)
@@ -1276,7 +1343,7 @@ class ConductorAPI(object):
:raises: NodeNotFound if the node does not exist.
:raises: NodeTraitNotFound if one of the traits is not found.
"""
- cctxt = self.client.prepare(topic=topic or self.topic, version='1.44')
+ cctxt = self._prepare_call(topic=topic, version='1.44')
return cctxt.call(context, 'remove_node_traits', node_id=node_id,
traits=traits)
@@ -1287,7 +1354,7 @@ class ConductorAPI(object):
:param allocation: an allocation object.
:param topic: RPC topic. Defaults to self.topic.
"""
- cctxt = self.client.prepare(topic=topic or self.topic, version='1.48')
+ cctxt = self._prepare_call(topic=topic, version='1.48')
return cctxt.call(context, 'create_allocation', allocation=allocation)
def destroy_allocation(self, context, allocation, topic=None):
@@ -1299,7 +1366,7 @@ class ConductorAPI(object):
:raises: InvalidState if the associated node is in the wrong provision
state to perform deallocation.
"""
- cctxt = self.client.prepare(topic=topic or self.topic, version='1.48')
+ cctxt = self._prepare_call(topic=topic, version='1.48')
return cctxt.call(context, 'destroy_allocation', allocation=allocation)
def get_node_with_token(self, context, node_id, topic=None):
@@ -1312,5 +1379,5 @@ class ConductorAPI(object):
:returns: A Node object with agent token.
"""
- cctxt = self.client.prepare(topic=topic or self.topic, version='1.49')
+ cctxt = self._prepare_call(topic=topic, version='1.49')
return cctxt.call(context, 'get_node_with_token', node_id=node_id)