summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJulia Kreger <juliaashleykreger@gmail.com>2022-10-24 09:18:13 -0700
committerJulia Kreger <juliaashleykreger@gmail.com>2022-10-27 10:07:41 -0700
commit5d5ae595380ac506eeb6a369e26efaed338790d1 (patch)
treed3b47eac474fe0bd8d28803535d779531ec5a6f5
parent58fd2425f60375cef0bac2d645ba80f342cae0a6 (diff)
downloadironic-5d5ae595380ac506eeb6a369e26efaed338790d1.tar.gz
Replace more instances of model_query
The model_query call results in a nested read transaction, that does not seem to play well with SQLite support. Since it's inherently relying on the query style deprecated in SQLAlchemy 2.0, we need to migrate away from this call. As an intermediate step, change instances of model_query to session.query, making sure every call creates a session that lives as long as is needed to fetch the results. Removes a unit test which was built around creating a fake deadlock condition to test that oslo_db was working as expected. It's interaction was totally mocked, and in retooling the base method there was no easy to keep the same test logic around. Co-Authored-By: Dmitry Tantsur <dtantsur@protonmail.com> Change-Id: Ic8b1d964f7be5784e01c89bfb6c0277ea82eec2d
-rw-r--r--ironic/db/sqlalchemy/api.py499
-rw-r--r--ironic/tests/unit/db/test_conductor.py15
2 files changed, 226 insertions, 288 deletions
diff --git a/ironic/db/sqlalchemy/api.py b/ironic/db/sqlalchemy/api.py
index b05af3637..7902e9634 100644
--- a/ironic/db/sqlalchemy/api.py
+++ b/ironic/db/sqlalchemy/api.py
@@ -34,7 +34,6 @@ from osprofiler import sqlalchemy as osp_sqlalchemy
import sqlalchemy as sa
from sqlalchemy import or_
from sqlalchemy.exc import NoResultFound, MultipleResultsFound
-from sqlalchemy.orm import joinedload
from sqlalchemy.orm import Load
from sqlalchemy.orm import selectinload
from sqlalchemy import sql
@@ -86,63 +85,6 @@ def _wrap_session(session):
return session
-def _get_node_query_with_all_for_single_node():
- """Return a query object for the Node joined with all relevant fields.
-
- Deprecated: This method, while useful, returns a "Legacy Query" object
- which, while useful is considered a legacy object from SQLAlchemy
- which at some point may be removed. SQLAlchemy encourages all users
- to move to the unified ORM/Core Select interface.
-
- This method utilizes a joined load query which creates a result set
- where corresponding traits, and tags, are joined together in the result
- set.
-
- This is more efficent from a Queries Per Second standpoint with the
- backend database, as they are not separate distinct queries which
- are being executed by the client.
-
- The downside of this, is the relationship of tags and traits to nodes
- is that there may be multiple tags and traits for each node. Ultimately
- this style of query forces SQLAlchemy to de-duplicate the result set
- because the database returns the nodes portion of the result set for
- every trait, tag, or other table field the query is joined with.
- This looks like:
-
- node1, tag1, trait1
- node1, tag1, trait2
- node1, tag1, trait3
- node1, tag2, trait1
-
- Et cetra, to create:
-
- node1, [tag1, tag2], [trait1, trait 2, trait3]
-
- Where joins are super in-efficent for Ironic, is where nodes are being
- enumerated, as the above result set pattern is not just for one node, but
- potentially thousands of nodes. Please consider using _get_node_select
- which results in a primary query for the nodes, and then performs
- additional targeted queries for the joined tables, as opposed to
- performing client side de-duplication.
-
- :returns: a query object.
- """
- # NOTE(TheJulia): This *likely* ought to be selectinload, however
- # it is a very common hit pattern for Ironic to query just the node.
- # In those sorts of locations, the performance issues are less noticable
- # to end users. *IF/WHEN* we change it to be selectinload for nodes,
- # the resulting DB load will see a queries per second increase, which
- # we should be careful about.
-
- # NOTE(TheJulia): Basic benchmark difference
- # Test data creation: 67.202 seconds.
- # 2.43 seconds to obtain all nodes from SQLAlchemy (10k nodes)
- # 5.15 seconds to obtain all nodes *and* have node objects (10k nodes)
- return (model_query(models.Node)
- .options(joinedload(models.Node.tags))
- .options(joinedload(models.Node.traits)))
-
-
def _get_node_select():
"""Returns a SQLAlchemy Select Object for Nodes.
@@ -182,15 +124,6 @@ def _get_deploy_template_select_with_steps():
).options(selectinload(models.DeployTemplate.steps))
-def _get_deploy_template_query_with_steps():
- """Return a query object for the DeployTemplate joined with steps.
-
- :returns: a query object.
- """
- return model_query(models.DeployTemplate).options(
- selectinload(models.DeployTemplate.steps))
-
-
def model_query(model, *args, **kwargs):
"""Query helper for simpler session usage.
@@ -367,7 +300,7 @@ def _paginate_query(model, limit=None, marker=None, sort_key=None,
# NOTE(TheJulia): We can't just ask for the bool of query if it is
# populated, so we need to ask if it is None.
if query is None:
- query = model_query(model)
+ query = sa.select(model)
sort_keys = ['id']
if sort_key and sort_key not in sort_keys:
sort_keys.insert(0, sort_key)
@@ -385,6 +318,12 @@ def _paginate_query(model, limit=None, marker=None, sort_key=None,
if isinstance(query, sa_orm.Query):
# The classic "Legacy" ORM query object result set which is
# deprecated in advance of SQLAlchemy 2.0.
+ # TODO(TheJulia): Calls of this style basically need to be
+ # eliminated in ironic as returning this way does not allow
+ # commit or rollback in enginefacade to occur until the returned
+ # object is garbage collected as ORM Query objects allow
+ # for DB interactions to occur after the fact, so it remains
+ # connected to the DB..
return query.all()
else:
# In this case, we have a sqlalchemy.sql.selectable.Select
@@ -653,19 +592,20 @@ class Connection(api.Connection):
raise exception.NodeNotFound(
_("Nodes cannot be found: %s") % ', '.join(missing))
- query = model_query(models.Node.uuid, models.Node.name).filter(
- sql.or_(models.Node.uuid.in_(uuids),
- models.Node.name.in_(names))
- )
- if project:
- query = query.filter((models.Node.owner == project)
- | (models.Node.lessee == project))
+ with _session_for_read() as session:
+ query = session.query(models.Node.uuid, models.Node.name).filter(
+ sql.or_(models.Node.uuid.in_(uuids),
+ models.Node.name.in_(names))
+ )
+ if project:
+ query = query.filter((models.Node.owner == project)
+ | (models.Node.lessee == project))
- for row in query:
- if row[0] in idents:
- mapping[row[0]] = row[0]
- if row[1] and row[1] in idents:
- mapping[row[1]] = row[0]
+ for row in query:
+ if row[0] in idents:
+ mapping[row[0]] = row[0]
+ if row[1] and row[1] in idents:
+ mapping[row[1]] = row[0]
missing = idents - set(mapping)
if missing:
@@ -707,14 +647,14 @@ class Connection(api.Connection):
@oslo_db_api.retry_on_deadlock
def reserve_node(self, tag, node_id):
- with _session_for_read():
+ with _session_for_read() as session:
try:
# TODO(TheJulia): Figure out a good way to query
# this so that we do it as light as possible without
# the full object invocation, which will speed lock
# activities. Granted, this is all at the DB level
# so maybe that is okay in the grand scheme of things.
- query = model_query(models.Node)
+ query = session.query(models.Node)
query = add_identity_filter(query, node_id)
node = query.one()
except NoResultFound:
@@ -729,9 +669,9 @@ class Connection(api.Connection):
@oslo_db_api.retry_on_deadlock
def release_node(self, tag, node_id):
- with _session_for_read():
+ with _session_for_read() as session:
try:
- query = model_query(models.Node)
+ query = session.query(models.Node)
query = add_identity_filter(query, node_id)
node = query.one()
except NoResultFound:
@@ -859,7 +799,7 @@ class Connection(api.Connection):
@oslo_db_api.retry_on_deadlock
def destroy_node(self, node_id):
with _session_for_write() as session:
- query = model_query(models.Node)
+ query = session.query(models.Node)
query = add_identity_filter(query, node_id)
try:
@@ -877,44 +817,45 @@ class Connection(api.Connection):
if uuidutils.is_uuid_like(node_id):
node_id = node_ref['id']
- port_query = model_query(models.Port)
+ port_query = session.query(models.Port)
port_query = add_port_filter_by_node(port_query, node_id)
port_query.delete()
- portgroup_query = model_query(models.Portgroup)
+ portgroup_query = session.query(models.Portgroup)
portgroup_query = add_portgroup_filter_by_node(portgroup_query,
node_id)
portgroup_query.delete()
# Delete all tags attached to the node
- tag_query = model_query(models.NodeTag).filter_by(node_id=node_id)
+ tag_query = session.query(models.NodeTag).filter_by(
+ node_id=node_id)
tag_query.delete()
# Delete all traits attached to the node
- trait_query = model_query(
+ trait_query = session.query(
models.NodeTrait).filter_by(node_id=node_id)
trait_query.delete()
- volume_connector_query = model_query(
+ volume_connector_query = session.query(
models.VolumeConnector).filter_by(node_id=node_id)
volume_connector_query.delete()
- volume_target_query = model_query(
+ volume_target_query = session.query(
models.VolumeTarget).filter_by(node_id=node_id)
volume_target_query.delete()
# delete all bios attached to the node
- bios_settings_query = model_query(
+ bios_settings_query = session.query(
models.BIOSSetting).filter_by(node_id=node_id)
bios_settings_query.delete()
# delete all allocations for this node
- allocation_query = model_query(
+ allocation_query = session.query(
models.Allocation).filter_by(node_id=node_id)
allocation_query.delete()
# delete all history for this node
- history_query = model_query(
+ history_query = session.query(
models.NodeHistory).filter_by(node_id=node_id)
history_query.delete()
@@ -942,10 +883,10 @@ class Connection(api.Connection):
@oslo_db_api.retry_on_deadlock
def _do_update_node(self, node_id, values):
- with _session_for_write():
+ with _session_for_write() as session:
# NOTE(mgoddard): Don't issue a joined query for the update as this
# does not work with PostgreSQL.
- query = model_query(models.Node)
+ query = session.query(models.Node)
query = add_identity_filter(query, node_id)
try:
ref = query.with_for_update().one()
@@ -1069,7 +1010,7 @@ class Connection(api.Connection):
raise exception.InvalidParameterValue(err=msg)
try:
with _session_for_write() as session:
- query = model_query(models.Port)
+ query = session.query(models.Port)
query = add_port_filter(query, port_id)
ref = query.one()
ref.update(values)
@@ -1085,8 +1026,8 @@ class Connection(api.Connection):
@oslo_db_api.retry_on_deadlock
def destroy_port(self, port_id):
- with _session_for_write():
- query = model_query(models.Port)
+ with _session_for_write() as session:
+ query = session.query(models.Port)
query = add_port_filter(query, port_id)
count = query.delete()
if count == 0:
@@ -1126,7 +1067,7 @@ class Connection(api.Connection):
def get_portgroup_list(self, limit=None, marker=None,
sort_key=None, sort_dir=None, project=None):
- query = model_query(models.Portgroup)
+ query = sa.select(models.Portgroup)
if project:
query = add_portgroup_filter_by_node_project(query, project)
return _paginate_query(models.Portgroup, limit, marker,
@@ -1134,7 +1075,7 @@ class Connection(api.Connection):
def get_portgroups_by_node_id(self, node_id, limit=None, marker=None,
sort_key=None, sort_dir=None, project=None):
- query = model_query(models.Portgroup)
+ query = sa.select(models.Portgroup)
query = query.where(models.Portgroup.node_id == node_id)
if project:
query = add_portgroup_filter_by_node_project(query, project)
@@ -1171,7 +1112,7 @@ class Connection(api.Connection):
with _session_for_write() as session:
try:
- query = model_query(models.Portgroup)
+ query = session.query(models.Portgroup)
query = add_portgroup_filter(query, portgroup_id)
ref = query.one()
ref.update(values)
@@ -1256,8 +1197,8 @@ class Connection(api.Connection):
msg = _("Cannot overwrite UUID for an existing Chassis.")
raise exception.InvalidParameterValue(err=msg)
- with _session_for_write():
- query = model_query(models.Chassis)
+ with _session_for_write() as session:
+ query = session.query(models.Chassis)
query = add_identity_where(query, models.Chassis, chassis_id)
count = query.update(values)
@@ -1268,19 +1209,14 @@ class Connection(api.Connection):
@oslo_db_api.retry_on_deadlock
def destroy_chassis(self, chassis_id):
- def chassis_not_empty():
- """Checks whether the chassis does not have nodes."""
-
- query = model_query(models.Node)
+ with _session_for_write() as session:
+ query = session.query(models.Node)
query = add_node_filter_by_chassis(query, chassis_id)
- return query.count() != 0
-
- with _session_for_write():
- if chassis_not_empty():
+ if query.count() != 0:
raise exception.ChassisNotEmpty(chassis=chassis_id)
- query = model_query(models.Chassis)
+ query = session.query(models.Chassis)
query = add_identity_filter(query, chassis_id)
count = query.delete()
@@ -1290,7 +1226,7 @@ class Connection(api.Connection):
@oslo_db_api.retry_on_deadlock
def register_conductor(self, values, update_existing=False):
with _session_for_write() as session:
- query = (model_query(models.Conductor)
+ query = (session.query(models.Conductor)
.filter_by(hostname=values['hostname']))
try:
ref = query.one()
@@ -1337,21 +1273,23 @@ class Connection(api.Connection):
@oslo_db_api.retry_on_deadlock
def touch_conductor(self, hostname):
- with _session_for_write():
- query = model_query(models.Conductor)
- query = query.where(models.Conductor.hostname == hostname)
- # since we're not changing any other field, manually set updated_at
- # and since we're heartbeating, make sure that online=True
- count = query.update({'updated_at': timeutils.utcnow(),
- 'online': True})
- if count == 0:
- raise exception.ConductorNotFound(conductor=hostname)
+ with _session_for_write() as session:
+ query = sa.update(models.Conductor).where(
+ models.Conductor.hostname == hostname
+ ).values({
+ 'updated_at': timeutils.utcnow(),
+ 'online': True}
+ ).execution_options(synchronize_session=False)
+ res = session.execute(query)
+ count = res.rowcount
+ if count == 0:
+ raise exception.ConductorNotFound(conductor=hostname)
@oslo_db_api.retry_on_deadlock
def clear_node_reservations_for_conductor(self, hostname):
nodes = []
- with _session_for_write():
- query = (model_query(models.Node)
+ with _session_for_write() as session:
+ query = (session.query(models.Node)
.filter(models.Node.reservation.ilike(hostname)))
nodes = [node['uuid'] for node in query]
query.update({'reservation': None}, synchronize_session=False)
@@ -1365,8 +1303,8 @@ class Connection(api.Connection):
@oslo_db_api.retry_on_deadlock
def clear_node_target_power_state(self, hostname):
nodes = []
- with _session_for_write():
- query = (model_query(models.Node)
+ with _session_for_write() as session:
+ query = (session.query(models.Node)
.filter(models.Node.reservation.ilike(hostname)))
query = query.filter(models.Node.target_power_state != sql.null())
nodes = [node['uuid'] for node in query]
@@ -1384,58 +1322,56 @@ class Connection(api.Connection):
'%(nodes)s', {'nodes': nodes})
def get_active_hardware_type_dict(self, use_groups=False):
- query = (model_query(models.ConductorHardwareInterfaces,
- models.Conductor)
- .join(models.Conductor))
- result = _filter_active_conductors(query)
-
- d2c = collections.defaultdict(set)
- for iface_row, cdr_row in result:
- hw_type = iface_row['hardware_type']
- if use_groups:
- key = '%s:%s' % (cdr_row['conductor_group'], hw_type)
- else:
- key = hw_type
- d2c[key].add(cdr_row['hostname'])
+ with _session_for_read() as session:
+ query = (session.query(models.ConductorHardwareInterfaces,
+ models.Conductor)
+ .join(models.Conductor))
+ result = _filter_active_conductors(query)
+
+ d2c = collections.defaultdict(set)
+ for iface_row, cdr_row in result:
+ hw_type = iface_row['hardware_type']
+ if use_groups:
+ key = '%s:%s' % (cdr_row['conductor_group'], hw_type)
+ else:
+ key = hw_type
+ d2c[key].add(cdr_row['hostname'])
return d2c
def get_offline_conductors(self, field='hostname'):
- field = getattr(models.Conductor, field)
- interval = CONF.conductor.heartbeat_timeout
- limit = timeutils.utcnow() - datetime.timedelta(seconds=interval)
- result = (model_query(field)
- .filter(models.Conductor.updated_at < limit))
- return [row[0] for row in result]
+ with _session_for_read() as session:
+ field = getattr(models.Conductor, field)
+ interval = CONF.conductor.heartbeat_timeout
+ limit = timeutils.utcnow() - datetime.timedelta(seconds=interval)
+ result = (session.query(field)
+ .filter(models.Conductor.updated_at < limit))
+ return [row[0] for row in result]
def get_online_conductors(self):
- query = model_query(models.Conductor.hostname)
- query = _filter_active_conductors(query)
- return [row[0] for row in query]
+ with _session_for_read() as session:
+ query = session.query(models.Conductor.hostname)
+ query = _filter_active_conductors(query)
+ return [row[0] for row in query]
def list_conductor_hardware_interfaces(self, conductor_id):
- query = (model_query(models.ConductorHardwareInterfaces)
- .where(models.ConductorHardwareInterfaces.conductor_id == conductor_id)) # noqa
- return query.all()
+ with _session_for_read() as session:
+ query = (session.query(models.ConductorHardwareInterfaces)
+ .filter_by(conductor_id=conductor_id))
+ return query.all()
def list_hardware_type_interfaces(self, hardware_types):
- query = (model_query(models.ConductorHardwareInterfaces)
- .filter(models.ConductorHardwareInterfaces.hardware_type
- .in_(hardware_types)))
+ with _session_for_read() as session:
+ query = (session.query(models.ConductorHardwareInterfaces)
+ .filter(models.ConductorHardwareInterfaces.hardware_type
+ .in_(hardware_types)))
- query = _filter_active_conductors(query)
- return query.all()
+ query = _filter_active_conductors(query)
+ return query.all()
@oslo_db_api.retry_on_deadlock
def register_conductor_hardware_interfaces(self, conductor_id, interfaces):
with _session_for_write() as session:
try:
- try:
- session.begin()
- except sa.exc.InvalidRequestError:
- # When running unit tests, the transaction reports as
- # already started, where as in service startup this is
- # the first write op.
- pass
for iface in interfaces:
conductor_hw_iface = models.ConductorHardwareInterfaces()
conductor_hw_iface['conductor_id'] = conductor_id
@@ -1450,22 +1386,22 @@ class Connection(api.Connection):
@oslo_db_api.retry_on_deadlock
def unregister_conductor_hardware_interfaces(self, conductor_id):
- with _session_for_write():
- query = (model_query(models.ConductorHardwareInterfaces)
+ with _session_for_write() as session:
+ query = (session.query(models.ConductorHardwareInterfaces)
.filter_by(conductor_id=conductor_id))
query.delete()
@oslo_db_api.retry_on_deadlock
def touch_node_provisioning(self, node_id):
- with _session_for_write():
- query = model_query(models.Node)
+ with _session_for_write() as session:
+ query = session.query(models.Node)
query = add_identity_filter(query, node_id)
count = query.update({'provision_updated_at': timeutils.utcnow()})
if count == 0:
raise exception.NodeNotFound(node=node_id)
- def _check_node_exists(self, node_id):
- if not model_query(models.Node).where(
+ def _check_node_exists(self, session, node_id):
+ if not session.query(models.Node).where(
models.Node.id == node_id).scalar():
raise exception.NodeNotFound(node=node_id)
@@ -1485,24 +1421,25 @@ class Connection(api.Connection):
@oslo_db_api.retry_on_deadlock
def unset_node_tags(self, node_id):
- self._check_node_exists(node_id)
- with _session_for_write():
- model_query(models.NodeTag).filter_by(node_id=node_id).delete()
+ with _session_for_write() as session:
+ self._check_node_exists(session, node_id)
+ session.query(models.NodeTag).filter_by(node_id=node_id).delete()
def get_node_tags_by_node_id(self, node_id):
- self._check_node_exists(node_id)
- result = (model_query(models.NodeTag)
- .filter_by(node_id=node_id)
- .all())
+ with _session_for_read() as session:
+ self._check_node_exists(session, node_id)
+ result = (session.query(models.NodeTag)
+ .filter_by(node_id=node_id)
+ .all())
return result
@oslo_db_api.retry_on_deadlock
def add_node_tag(self, node_id, tag):
- node_tag = models.NodeTag(tag=tag, node_id=node_id)
-
- self._check_node_exists(node_id)
try:
with _session_for_write() as session:
+ node_tag = models.NodeTag(tag=tag, node_id=node_id)
+
+ self._check_node_exists(session, node_id)
session.add(node_tag)
session.flush()
except db_exc.DBDuplicateEntry:
@@ -1513,18 +1450,20 @@ class Connection(api.Connection):
@oslo_db_api.retry_on_deadlock
def delete_node_tag(self, node_id, tag):
- self._check_node_exists(node_id)
- with _session_for_write():
- result = model_query(models.NodeTag).filter_by(
+ with _session_for_write() as session:
+ self._check_node_exists(session, node_id)
+ result = session.query(models.NodeTag).filter_by(
node_id=node_id, tag=tag).delete()
- if not result:
- raise exception.NodeTagNotFound(node_id=node_id, tag=tag)
+ if not result:
+ raise exception.NodeTagNotFound(node_id=node_id, tag=tag)
def node_tag_exists(self, node_id, tag):
- self._check_node_exists(node_id)
- q = model_query(models.NodeTag).filter_by(node_id=node_id, tag=tag)
- return model_query(q.exists()).scalar()
+ with _session_for_read() as session:
+ self._check_node_exists(session, node_id)
+ q = session.query(models.NodeTag).filter_by(
+ node_id=node_id, tag=tag)
+ return session.query(q.exists()).scalar()
def get_node_by_port_addresses(self, addresses):
q = _get_node_select()
@@ -1549,7 +1488,7 @@ class Connection(api.Connection):
def get_volume_connector_list(self, limit=None, marker=None,
sort_key=None, sort_dir=None, project=None):
- query = model_query(models.VolumeConnector)
+ query = sa.select(models.VolumeConnector)
if project:
query = add_volume_conn_filter_by_node_project(query, project)
return _paginate_query(models.VolumeConnector, limit, marker,
@@ -1573,7 +1512,7 @@ class Connection(api.Connection):
def get_volume_connectors_by_node_id(self, node_id, limit=None,
marker=None, sort_key=None,
sort_dir=None, project=None):
- query = model_query(models.VolumeConnector).where(
+ query = sa.select(models.VolumeConnector).where(
models.VolumeConnector.node_id == node_id)
if project:
add_volume_conn_filter_by_node_project(query, project)
@@ -1608,7 +1547,7 @@ class Connection(api.Connection):
try:
with _session_for_write() as session:
- query = model_query(models.VolumeConnector)
+ query = session.query(models.VolumeConnector)
query = add_identity_filter(query, ident)
ref = query.one()
orig_type = ref['type']
@@ -1626,8 +1565,8 @@ class Connection(api.Connection):
@oslo_db_api.retry_on_deadlock
def destroy_volume_connector(self, ident):
- with _session_for_write():
- query = model_query(models.VolumeConnector)
+ with _session_for_write() as session:
+ query = session.query(models.VolumeConnector)
query = add_identity_filter(query, ident)
count = query.delete()
if count == 0:
@@ -1635,7 +1574,7 @@ class Connection(api.Connection):
def get_volume_target_list(self, limit=None, marker=None,
sort_key=None, sort_dir=None, project=None):
- query = model_query(models.VolumeTarget)
+ query = sa.select(models.VolumeTarget)
if project:
query = add_volume_target_filter_by_node_project(query, project)
return _paginate_query(models.VolumeTarget, limit, marker,
@@ -1659,7 +1598,8 @@ class Connection(api.Connection):
def get_volume_targets_by_node_id(self, node_id, limit=None, marker=None,
sort_key=None, sort_dir=None,
project=None):
- query = model_query(models.VolumeTarget).filter_by(node_id=node_id)
+ query = sa.select(models.VolumeTarget).where(
+ models.VolumeTarget.node_id == node_id)
if project:
add_volume_target_filter_by_node_project(query, project)
return _paginate_query(models.VolumeTarget, limit, marker, sort_key,
@@ -1668,7 +1608,7 @@ class Connection(api.Connection):
def get_volume_targets_by_volume_id(self, volume_id, limit=None,
marker=None, sort_key=None,
sort_dir=None, project=None):
- query = model_query(models.VolumeTarget).where(
+ query = sa.select(models.VolumeTarget).where(
models.VolumeTarget.volume_id == volume_id)
if project:
query = add_volume_target_filter_by_node_project(query, project)
@@ -1702,7 +1642,7 @@ class Connection(api.Connection):
try:
with _session_for_write() as session:
- query = model_query(models.VolumeTarget)
+ query = session.query(models.VolumeTarget)
query = add_identity_filter(query, ident)
ref = query.one()
orig_boot_index = ref['boot_index']
@@ -1717,8 +1657,8 @@ class Connection(api.Connection):
@oslo_db_api.retry_on_deadlock
def destroy_volume_target(self, ident):
- with _session_for_write():
- query = model_query(models.VolumeTarget)
+ with _session_for_write() as session:
+ query = session.query(models.VolumeTarget)
query = add_identity_filter(query, ident)
count = query.delete()
if count == 0:
@@ -1850,10 +1790,11 @@ class Connection(api.Connection):
all_models.append(models.Node)
sql_models = [model for model in all_models
if model.__name__ in mapping]
- for model in sql_models:
- version = mapping[model.__name__][0]
- query = model_query(model).filter(model.version != version)
- total_to_migrate += query.count()
+ with _session_for_read() as session:
+ for model in sql_models:
+ version = mapping[model.__name__][0]
+ query = session.query(model).filter(model.version != version)
+ total_to_migrate += query.count()
if not total_to_migrate:
return total_to_migrate, 0
@@ -1877,8 +1818,8 @@ class Connection(api.Connection):
for model in sql_models:
version = mapping[model.__name__][0]
num_migrated = 0
- with _session_for_write():
- query = model_query(model).filter(model.version != version)
+ with _session_for_write() as session:
+ query = session.query(model).filter(model.version != version)
# NOTE(rloo) Caution here; after doing query.count(), it is
# possible that the value is different in the
# next invocation of the query.
@@ -1890,14 +1831,14 @@ class Connection(api.Connection):
for obj in query.slice(0, max_to_migrate):
ids.append(obj['id'])
num_migrated = (
- model_query(model).
+ session.query(model).
filter(sql.and_(model.id.in_(ids),
model.version != version)).
update({model.version: version},
synchronize_session=False))
else:
num_migrated = (
- model_query(model).
+ session.query(model).
filter(model.version != version).
update({model.version: version},
synchronize_session=False))
@@ -1948,15 +1889,16 @@ class Connection(api.Connection):
@oslo_db_api.retry_on_deadlock
def unset_node_traits(self, node_id):
- self._check_node_exists(node_id)
- with _session_for_write():
- model_query(models.NodeTrait).filter_by(node_id=node_id).delete()
+ with _session_for_write() as session:
+ self._check_node_exists(session, node_id)
+ session.query(models.NodeTrait).filter_by(node_id=node_id).delete()
def get_node_traits_by_node_id(self, node_id):
- self._check_node_exists(node_id)
- result = (model_query(models.NodeTrait)
- .filter_by(node_id=node_id)
- .all())
+ with _session_for_read() as session:
+ self._check_node_exists(session, node_id)
+ result = (session.query(models.NodeTrait)
+ .filter_by(node_id=node_id)
+ .all())
return result
@oslo_db_api.retry_on_deadlock
@@ -1964,13 +1906,14 @@ class Connection(api.Connection):
node_trait = models.NodeTrait(trait=trait, node_id=node_id,
version=version)
- self._check_node_exists(node_id)
try:
with _session_for_write() as session:
+ self._check_node_exists(session, node_id)
+
session.add(node_trait)
session.flush()
- num_traits = (model_query(models.NodeTrait)
+ num_traits = (session.query(models.NodeTrait)
.filter_by(node_id=node_id).count())
self._verify_max_traits_per_node(node_id, num_traits)
except db_exc.DBDuplicateEntry:
@@ -1981,25 +1924,26 @@ class Connection(api.Connection):
@oslo_db_api.retry_on_deadlock
def delete_node_trait(self, node_id, trait):
- self._check_node_exists(node_id)
- with _session_for_write():
- result = model_query(models.NodeTrait).filter_by(
+ with _session_for_write() as session:
+ self._check_node_exists(session, node_id)
+ result = session.query(models.NodeTrait).filter_by(
node_id=node_id, trait=trait).delete()
- if not result:
- raise exception.NodeTraitNotFound(node_id=node_id, trait=trait)
+ if not result:
+ raise exception.NodeTraitNotFound(node_id=node_id, trait=trait)
def node_trait_exists(self, node_id, trait):
- self._check_node_exists(node_id)
- q = model_query(
- models.NodeTrait).filter_by(node_id=node_id, trait=trait)
- return model_query(q.exists()).scalar()
+ with _session_for_read() as session:
+ self._check_node_exists(session, node_id)
+ q = session.query(
+ models.NodeTrait).filter_by(node_id=node_id, trait=trait)
+ return session.query(q.exists()).scalar()
@oslo_db_api.retry_on_deadlock
def create_bios_setting_list(self, node_id, settings, version):
- self._check_node_exists(node_id)
bios_settings = []
with _session_for_write() as session:
+ self._check_node_exists(session, node_id)
try:
for setting in settings:
bios_setting = models.BIOSSetting(
@@ -2026,12 +1970,12 @@ class Connection(api.Connection):
@oslo_db_api.retry_on_deadlock
def update_bios_setting_list(self, node_id, settings, version):
- self._check_node_exists(node_id)
bios_settings = []
with _session_for_write() as session:
+ self._check_node_exists(session, node_id)
try:
for setting in settings:
- query = model_query(models.BIOSSetting).filter_by(
+ query = session.query(models.BIOSSetting).filter_by(
node_id=node_id, name=setting['name'])
ref = query.one()
ref.update({'value': setting['value'],
@@ -2057,11 +2001,11 @@ class Connection(api.Connection):
@oslo_db_api.retry_on_deadlock
def delete_bios_setting_list(self, node_id, names):
- self._check_node_exists(node_id)
missing_bios_settings = []
- with _session_for_write():
+ with _session_for_write() as session:
+ self._check_node_exists(session, node_id)
for name in names:
- count = model_query(models.BIOSSetting).filter_by(
+ count = session.query(models.BIOSSetting).filter_by(
node_id=node_id, name=name).delete()
if count == 0:
missing_bios_settings.append(name)
@@ -2070,20 +2014,22 @@ class Connection(api.Connection):
node=node_id, names=','.join(missing_bios_settings))
def get_bios_setting(self, node_id, name):
- self._check_node_exists(node_id)
- query = model_query(models.BIOSSetting).filter_by(
- node_id=node_id, name=name)
- try:
- ref = query.one()
- except NoResultFound:
- raise exception.BIOSSettingNotFound(node=node_id, name=name)
+ with _session_for_read() as session:
+ self._check_node_exists(session, node_id)
+ query = session.query(models.BIOSSetting).filter_by(
+ node_id=node_id, name=name)
+ try:
+ ref = query.one()
+ except NoResultFound:
+ raise exception.BIOSSettingNotFound(node=node_id, name=name)
return ref
def get_bios_setting_list(self, node_id):
- self._check_node_exists(node_id)
- result = (model_query(models.BIOSSetting)
- .filter_by(node_id=node_id)
- .all())
+ with _session_for_read() as session:
+ self._check_node_exists(session, node_id)
+ result = (session.query(models.BIOSSetting)
+ .filter_by(node_id=node_id)
+ .all())
return result
def get_allocation_by_id(self, allocation_id):
@@ -2093,11 +2039,13 @@ class Connection(api.Connection):
:returns: An allocation.
:raises: AllocationNotFound
"""
- query = model_query(models.Allocation).filter_by(id=allocation_id)
- try:
- return query.one()
- except NoResultFound:
- raise exception.AllocationNotFound(allocation=allocation_id)
+ with _session_for_read() as session:
+ query = session.query(models.Allocation).filter_by(
+ id=allocation_id)
+ try:
+ return query.one()
+ except NoResultFound:
+ raise exception.AllocationNotFound(allocation=allocation_id)
def get_allocation_by_uuid(self, allocation_uuid):
"""Return an allocation representation.
@@ -2106,11 +2054,13 @@ class Connection(api.Connection):
:returns: An allocation.
:raises: AllocationNotFound
"""
- query = model_query(models.Allocation).filter_by(uuid=allocation_uuid)
- try:
- return query.one()
- except NoResultFound:
- raise exception.AllocationNotFound(allocation=allocation_uuid)
+ with _session_for_read() as session:
+ query = session.query(models.Allocation).filter_by(
+ uuid=allocation_uuid)
+ try:
+ return query.one()
+ except NoResultFound:
+ raise exception.AllocationNotFound(allocation=allocation_uuid)
def get_allocation_by_name(self, name):
"""Return an allocation representation.
@@ -2119,11 +2069,12 @@ class Connection(api.Connection):
:returns: An allocation.
:raises: AllocationNotFound
"""
- query = model_query(models.Allocation).filter_by(name=name)
- try:
- return query.one()
- except NoResultFound:
- raise exception.AllocationNotFound(allocation=name)
+ with _session_for_read() as session:
+ query = session.query(models.Allocation).filter_by(name=name)
+ try:
+ return query.one()
+ except NoResultFound:
+ raise exception.AllocationNotFound(allocation=name)
def get_allocation_list(self, filters=None, limit=None, marker=None,
sort_key=None, sort_dir=None):
@@ -2142,8 +2093,9 @@ class Connection(api.Connection):
(asc, desc)
:returns: A list of allocations.
"""
- query = self._add_allocations_filters(model_query(models.Allocation),
- filters)
+ query = self._add_allocations_filters(
+ sa.select(models.Allocation),
+ filters)
return _paginate_query(models.Allocation, limit, marker,
sort_key, sort_dir, query)
@@ -2200,14 +2152,14 @@ class Connection(api.Connection):
with _session_for_write() as session:
try:
- query = model_query(models.Allocation, session=session)
+ query = session.query(models.Allocation)
query = add_identity_filter(query, allocation_id)
ref = query.one()
ref.update(values)
instance_uuid = ref.uuid
if values.get('node_id') and update_node:
- node = model_query(models.Node, session=session).filter_by(
+ node = session.query(models.Node).filter_by(
id=ref.node_id).with_for_update().one()
node_uuid = node.uuid
if node.instance_uuid and node.instance_uuid != ref.uuid:
@@ -2252,7 +2204,7 @@ class Connection(api.Connection):
"""
with _session_for_write() as session:
try:
- query = model_query(models.Allocation, session=session)
+ query = session.query(models.Allocation)
query = add_identity_filter(query, allocation_id)
# NOTE(dtantsur): the FOR UPDATE clause locks the allocation
ref = query.with_for_update().one()
@@ -2275,7 +2227,7 @@ class Connection(api.Connection):
:raises: AllocationNotFound
"""
with _session_for_write() as session:
- query = model_query(models.Allocation)
+ query = session.query(models.Allocation)
query = add_identity_filter(query, allocation_id)
try:
@@ -2285,7 +2237,7 @@ class Connection(api.Connection):
allocation_id = ref['id']
- node_query = model_query(models.Node, session=session).filter_by(
+ node_query = session.query(models.Node).filter_by(
allocation_id=allocation_id)
node_query.update({'allocation_id': None, 'instance_uuid': None})
@@ -2338,7 +2290,7 @@ class Connection(api.Connection):
return step.interface, step.step, sortable_args, step.priority
# List all existing steps for the template.
- current_steps = (model_query(models.DeployTemplateStep)
+ current_steps = (session.query(models.DeployTemplateStep)
.filter_by(deploy_template_id=template_id))
# List the new steps for the template.
@@ -2362,7 +2314,7 @@ class Connection(api.Connection):
# Delete and create steps in bulk as necessary.
if step_ids_to_delete:
- ((model_query(models.DeployTemplateStep)
+ ((session.query(models.DeployTemplateStep)
.filter(models.DeployTemplateStep.id.in_(step_ids_to_delete)))
.delete(synchronize_session=False))
if steps_to_create:
@@ -2378,7 +2330,7 @@ class Connection(api.Connection):
with _session_for_write() as session:
# NOTE(mgoddard): Don't issue a joined query for the update as
# this does not work with PostgreSQL.
- query = model_query(models.DeployTemplate)
+ query = session.query(models.DeployTemplate)
query = add_identity_filter(query, template_id)
ref = query.with_for_update().one()
# First, update non-step columns.
@@ -2406,10 +2358,10 @@ class Connection(api.Connection):
@oslo_db_api.retry_on_deadlock
def destroy_deploy_template(self, template_id):
- with _session_for_write():
- model_query(models.DeployTemplateStep).filter_by(
+ with _session_for_write() as session:
+ session.query(models.DeployTemplateStep).filter_by(
deploy_template_id=template_id).delete()
- count = model_query(models.DeployTemplate).filter_by(
+ count = session.query(models.DeployTemplate).filter_by(
id=template_id).delete()
if count == 0:
raise exception.DeployTemplateNotFound(template=template_id)
@@ -2439,7 +2391,8 @@ class Connection(api.Connection):
def get_deploy_template_list(self, limit=None, marker=None,
sort_key=None, sort_dir=None):
- query = _get_deploy_template_query_with_steps()
+ query = model_query(models.DeployTemplate).options(
+ selectinload(models.DeployTemplate.steps))
return _paginate_query(models.DeployTemplate, limit, marker,
sort_key, sort_dir, query)
@@ -2469,8 +2422,8 @@ class Connection(api.Connection):
@oslo_db_api.retry_on_deadlock
def destroy_node_history_by_uuid(self, history_uuid):
- with _session_for_write():
- query = model_query(models.NodeHistory).filter_by(
+ with _session_for_write() as session:
+ query = session.query(models.NodeHistory).filter_by(
uuid=history_uuid)
count = query.delete()
if count == 0:
diff --git a/ironic/tests/unit/db/test_conductor.py b/ironic/tests/unit/db/test_conductor.py
index 0187ebca0..efc3a38a3 100644
--- a/ironic/tests/unit/db/test_conductor.py
+++ b/ironic/tests/unit/db/test_conductor.py
@@ -18,9 +18,6 @@
import datetime
from unittest import mock
-import oslo_db
-from oslo_db import exception as db_exc
-from oslo_db import sqlalchemy
from oslo_utils import timeutils
from ironic.common import exception
@@ -158,18 +155,6 @@ class DbConductorTestCase(base.DbTestCase):
c = self.dbapi.get_conductor(c.hostname)
self.assertEqual(test_time, timeutils.normalize_time(c.updated_at))
- @mock.patch.object(oslo_db.api.time, 'sleep', autospec=True)
- @mock.patch.object(sqlalchemy.orm.Query, 'update', autospec=True)
- def test_touch_conductor_deadlock(self, mock_update, mock_sleep):
- mock_sleep.return_value = None
- mock_update.side_effect = [db_exc.DBDeadlock(), None]
- c = self._create_test_cdr()
- self.dbapi.touch_conductor(c.hostname)
- self.assertEqual(2, mock_update.call_count)
- # Count that it was called, but not the number of times
- # as this is *actually* time.sleep via import from oslo_db.api
- self.assertTrue(mock_sleep.called)
-
def test_touch_conductor_not_found(self):
# A conductor's heartbeat will not create a new record,
# it will only update existing ones