summaryrefslogtreecommitdiff
path: root/ironic/db/sqlalchemy/api.py
diff options
context:
space:
mode:
Diffstat (limited to 'ironic/db/sqlalchemy/api.py')
-rw-r--r--ironic/db/sqlalchemy/api.py549
1 files changed, 360 insertions, 189 deletions
diff --git a/ironic/db/sqlalchemy/api.py b/ironic/db/sqlalchemy/api.py
index c14719af8..b05af3637 100644
--- a/ironic/db/sqlalchemy/api.py
+++ b/ironic/db/sqlalchemy/api.py
@@ -19,9 +19,11 @@ import datetime
import json
import threading
+from oslo_concurrency import lockutils
from oslo_db import api as oslo_db_api
from oslo_db import exception as db_exc
from oslo_db.sqlalchemy import enginefacade
+from oslo_db.sqlalchemy import orm as sa_orm
from oslo_db.sqlalchemy import utils as db_utils
from oslo_log import log
from oslo_utils import netutils
@@ -31,7 +33,7 @@ from oslo_utils import uuidutils
from osprofiler import sqlalchemy as osp_sqlalchemy
import sqlalchemy as sa
from sqlalchemy import or_
-from sqlalchemy.orm.exc import NoResultFound, MultipleResultsFound
+from sqlalchemy.exc import NoResultFound, MultipleResultsFound
from sqlalchemy.orm import joinedload
from sqlalchemy.orm import Load
from sqlalchemy.orm import selectinload
@@ -53,6 +55,10 @@ LOG = log.getLogger(__name__)
_CONTEXT = threading.local()
+
+RESERVATION_SEMAPHORE = "reserve_node_db_lock"
+synchronized = lockutils.synchronized_with_prefix('ironic-')
+
# NOTE(mgoddard): We limit the number of traits per node to 50 as this is the
# maximum number of traits per resource provider allowed in placement.
MAX_TRAITS_PER_NODE = 50
@@ -83,6 +89,11 @@ def _wrap_session(session):
def _get_node_query_with_all_for_single_node():
"""Return a query object for the Node joined with all relevant fields.
+ Deprecated: This method, while useful, returns a "Legacy Query" object
+ which, while useful is considered a legacy object from SQLAlchemy
+ which at some point may be removed. SQLAlchemy encourages all users
+ to move to the unified ORM/Core Select interface.
+
This method utilizes a joined load query which creates a result set
where corresponding traits, and tags, are joined together in the result
set.
@@ -109,9 +120,10 @@ def _get_node_query_with_all_for_single_node():
Where joins are super in-efficent for Ironic, is where nodes are being
enumerated, as the above result set pattern is not just for one node, but
- potentially thousands of nodes. In that case, we should use the
- _get_node_query_with_all_for_list helper to return a more appropriate
- query object which will be more efficient for the end user.
+ potentially thousands of nodes. Please consider using _get_node_select
+ which results in a primary query for the nodes, and then performs
+ additional targeted queries for the joined tables, as opposed to
+ performing client side de-duplication.
:returns: a query object.
"""
@@ -127,49 +139,47 @@ def _get_node_query_with_all_for_single_node():
# 2.43 seconds to obtain all nodes from SQLAlchemy (10k nodes)
# 5.15 seconds to obtain all nodes *and* have node objects (10k nodes)
return (model_query(models.Node)
- .options(joinedload('tags'))
- .options(joinedload('traits')))
+ .options(joinedload(models.Node.tags))
+ .options(joinedload(models.Node.traits)))
-def _get_node_query_with_all_for_list():
- """Return a query object for the Node with queried extra fields.
+def _get_node_select():
+ """Returns a SQLAlchemy Select Object for Nodes.
- This method returns a query object joining tags and traits in a pattern
- where the result set is first built, and then the resulting associations
- are queried separately and the objects are reconciled by SQLAlchemy to
- build the composite objects based upon the associations.
+ This method returns a pre-formatted select object which models
+ the entire Node object, allowing callers to operate on a node like
+ they would have with an SQLAlchemy ORM Query Object.
- This results in the following query pattern when the query is executed:
+ This object *also* performs two additional select queries, in the form
+ of a selectin operation, to achieve the same results of a Join query,
+ but without the join query itself, and the client side load.
- select $fields from nodes where x;
- # SQLAlchemy creates a list of associated node IDs.
- select $fields from tags where node_id in ('1', '3', '37268');
- select $fields from traits where node_id in ('1', '3', '37268');
+ This method is best utilized when retrieving lists of nodes.
- SQLAlchemy then returns a result set where the tags and traits are
- composited together efficently as opposed to having to deduplicate
- the result set. This shifts additional load to the database which
- was previously a high overhead operation with-in the conductor...
- which results in a slower conductor.
+ Select objects in this fashion were added as a result of SQLAlchemy 1.4
+ in preparation for SQLAlchemy 2.0's release to provide a unified
+ select interface.
- :returns: a query object.
+ :returns: a select object
"""
- # NOTE(TheJulia): When comparing CI rubs *with* this being the default
- # for all general list operations, at 10k nodes, this pattern appears
- # to be on-par with a 5% variability between the two example benchmark
- # tests. That being said, the test *does* not include tags or traits
- # in it's test data set so client side deduplication is not measured.
- # NOTE(TheJulia): Basic benchmark difference
- # tests data creation: 67.117 seconds
- # 2.32 seconds to obtain all nodes from SQLAlchemy (10k nodes)
- # 4.99 seconds to obtain all nodes *and* have node objects (10k nodes)
- # If this holds true, the required record deduplication with joinedload
- # may be basically the same amount of overhead as requesting the tags
- # and traits separately.
- return (model_query(models.Node)
- .options(selectinload('tags'))
- .options(selectinload('traits')))
+ # NOTE(TheJulia): This returns a query in the SQLAlchemy 1.4->2.0
+ # migration style as query model loading is deprecated.
+
+ # This must use selectinload to avoid later need to invokededuplication.
+ return (sa.select(models.Node)
+ .options(selectinload(models.Node.tags),
+ selectinload(models.Node.traits)))
+
+
+def _get_deploy_template_select_with_steps():
+ """Return a select object for the DeployTemplate joined with steps.
+
+ :returns: a select object.
+ """
+ return sa.select(
+ models.DeployTemplate
+ ).options(selectinload(models.DeployTemplate.steps))
def _get_deploy_template_query_with_steps():
@@ -177,7 +187,8 @@ def _get_deploy_template_query_with_steps():
:returns: a query object.
"""
- return model_query(models.DeployTemplate).options(joinedload('steps'))
+ return model_query(models.DeployTemplate).options(
+ selectinload(models.DeployTemplate.steps))
def model_query(model, *args, **kwargs):
@@ -209,6 +220,26 @@ def add_identity_filter(query, value):
raise exception.InvalidIdentity(identity=value)
+def add_identity_where(op, model, value):
+ """Adds an identity filter to operation for where method.
+
+ Filters results by ID, if supplied value is a valid integer.
+ Otherwise attempts to filter results by UUID.
+
+ :param op: Initial operation to add filter to.
+ i.e. a update or delete statement.
+ :param model: The SQLAlchemy model to apply.
+ :param value: Value for filtering results by.
+ :return: Modified query.
+ """
+ if strutils.is_int_like(value):
+ return op.where(model.id == value)
+ elif uuidutils.is_uuid_like(value):
+ return op.where(model.uuid == value)
+ else:
+ raise exception.InvalidIdentity(identity=value)
+
+
def add_port_filter(query, value):
"""Adds a port-specific filter to a query.
@@ -281,7 +312,7 @@ def add_portgroup_filter(query, value):
if netutils.is_valid_mac(value):
return query.filter_by(address=value)
else:
- return add_identity_filter(query, value)
+ return add_identity_where(query, models.Portgroup, value)
def add_portgroup_filter_by_node(query, value):
@@ -332,8 +363,10 @@ def add_allocation_filter_by_conductor(query, value):
def _paginate_query(model, limit=None, marker=None, sort_key=None,
- sort_dir=None, query=None):
- if not query:
+ sort_dir=None, query=None, return_base_tuple=False):
+ # NOTE(TheJulia): We can't just ask for the bool of query if it is
+ # populated, so we need to ask if it is None.
+ if query is None:
query = model_query(model)
sort_keys = ['id']
if sort_key and sort_key not in sort_keys:
@@ -345,7 +378,28 @@ def _paginate_query(model, limit=None, marker=None, sort_key=None,
raise exception.InvalidParameterValue(
_('The sort_key value "%(key)s" is an invalid field for sorting')
% {'key': sort_key})
- return query.all()
+ with _session_for_read() as session:
+ # NOTE(TheJulia): SQLAlchemy 2.0 no longer returns pre-uniqued result
+ # sets in ORM mode, so we need to explicitly ask for it to be unique
+ # before returning it to the caller.
+ if isinstance(query, sa_orm.Query):
+ # The classic "Legacy" ORM query object result set which is
+ # deprecated in advance of SQLAlchemy 2.0.
+ return query.all()
+ else:
+ # In this case, we have a sqlalchemy.sql.selectable.Select
+ # (most likely) which utilizes the unified select interface.
+ res = session.execute(query).fetchall()
+ if len(res) == 0:
+ # Return an empty list instead of a class with no objects.
+ return []
+ if return_base_tuple:
+ # The caller expects a tuple, lets just give it to them.
+ return res
+ # Everything is a tuple in a resultset from the unified interface
+ # but for objects, our model expects just object access,
+ # so we extract and return them.
+ return [r[0] for r in res]
def _filter_active_conductors(query, interval=None):
@@ -514,15 +568,16 @@ class Connection(api.Connection):
else:
columns = [getattr(models.Node, c) for c in columns]
- query = model_query(*columns, base_model=models.Node)
+ query = sa.select(*columns)
query = self._add_nodes_filters(query, filters)
return _paginate_query(models.Node, limit, marker,
- sort_key, sort_dir, query)
+ sort_key, sort_dir, query,
+ return_base_tuple=True)
def get_node_list(self, filters=None, limit=None, marker=None,
sort_key=None, sort_dir=None, fields=None):
if not fields:
- query = _get_node_query_with_all_for_list()
+ query = _get_node_select()
query = self._add_nodes_filters(query, filters)
return _paginate_query(models.Node, limit, marker,
sort_key, sort_dir, query)
@@ -559,24 +614,25 @@ class Connection(api.Connection):
# with SQLAlchemy.
traits_found = True
use_columns.remove('traits')
-
# Generate the column object list so SQLAlchemy only fulfills
# the requested columns.
use_columns = [getattr(models.Node, c) for c in use_columns]
-
# In essence, traits (and anything else needed to generate the
# composite objects) need to be reconciled without using a join
# as multiple rows can be generated in the result set being returned
# from the database server. In this case, with traits, we use
# a selectinload pattern.
if traits_found:
- query = model_query(models.Node).options(
- Load(models.Node).load_only(*use_columns),
- selectinload(models.Node.traits))
+ query = sa.select(models.Node).options(
+ selectinload(models.Node.traits),
+ Load(models.Node).load_only(*use_columns)
+ )
else:
- query = model_query(models.Node).options(
- Load(models.Node).load_only(*use_columns))
-
+ # Note for others, if you ask for a whole model, it is
+ # modeled, i.e. you can access it as an object.
+ query = sa.select(models.NodeBase).options(
+ Load(models.Node).load_only(*use_columns)
+ )
query = self._add_nodes_filters(query, filters)
return _paginate_query(models.Node, limit, marker,
sort_key, sort_dir, query)
@@ -618,40 +674,85 @@ class Connection(api.Connection):
return mapping
+ @synchronized(RESERVATION_SEMAPHORE, fair=True)
+ def _reserve_node_place_lock(self, tag, node_id, node):
+ try:
+ # NOTE(TheJulia): We explicitly do *not* synch the session
+ # so the other actions in the conductor do not become aware
+ # that the lock is in place and believe they hold the lock.
+ # This necessitates an overall lock in the code side, so
+ # we avoid conditions where two separate threads can believe
+ # they hold locks at the same time.
+ with _session_for_write() as session:
+ res = session.execute(
+ sa.update(models.Node).
+ where(models.Node.id == node.id).
+ where(models.Node.reservation == None). # noqa
+ values(reservation=tag).
+ execution_options(synchronize_session=False))
+ session.flush()
+ node = self._get_node_by_id_no_joins(node.id)
+ # NOTE(TheJulia): In SQLAlchemy 2.0 style, we don't
+ # magically get a changed node as they moved from the
+ # many ways to do things to singular ways to do things.
+ if res.rowcount != 1:
+ # Nothing updated and node exists. Must already be
+ # locked.
+ raise exception.NodeLocked(node=node.uuid,
+ host=node.reservation)
+ except NoResultFound:
+ # In the event that someone has deleted the node on
+ # another thread.
+ raise exception.NodeNotFound(node=node_id)
+
@oslo_db_api.retry_on_deadlock
def reserve_node(self, tag, node_id):
- with _session_for_write():
- query = _get_node_query_with_all_for_single_node()
- query = add_identity_filter(query, node_id)
- count = query.filter_by(reservation=None).update(
- {'reservation': tag}, synchronize_session=False)
+ with _session_for_read():
try:
+ # TODO(TheJulia): Figure out a good way to query
+ # this so that we do it as light as possible without
+ # the full object invocation, which will speed lock
+ # activities. Granted, this is all at the DB level
+ # so maybe that is okay in the grand scheme of things.
+ query = model_query(models.Node)
+ query = add_identity_filter(query, node_id)
node = query.one()
- if count != 1:
- # Nothing updated and node exists. Must already be
- # locked.
- raise exception.NodeLocked(node=node.uuid,
- host=node['reservation'])
- return node
except NoResultFound:
raise exception.NodeNotFound(node=node_id)
+ if node.reservation:
+ # Fail fast, instead of attempt the update.
+ raise exception.NodeLocked(node=node.uuid,
+ host=node.reservation)
+ self._reserve_node_place_lock(tag, node_id, node)
+ # Return a node object as that is the contract for this method.
+ return self.get_node_by_id(node.id)
@oslo_db_api.retry_on_deadlock
def release_node(self, tag, node_id):
- with _session_for_write():
- query = model_query(models.Node)
- query = add_identity_filter(query, node_id)
- # be optimistic and assume we usually release a reservation
- count = query.filter_by(reservation=tag).update(
- {'reservation': None}, synchronize_session=False)
+ with _session_for_read():
try:
- if count != 1:
- node = query.one()
- if node['reservation'] is None:
+ query = model_query(models.Node)
+ query = add_identity_filter(query, node_id)
+ node = query.one()
+ except NoResultFound:
+ raise exception.NodeNotFound(node=node_id)
+ with _session_for_write() as session:
+ try:
+ res = session.execute(
+ sa.update(models.Node).
+ where(models.Node.id == node.id).
+ where(models.Node.reservation == tag).
+ values(reservation=None).
+ execution_options(synchronize_session=False)
+ )
+ node = self.get_node_by_id(node.id)
+ if res.rowcount != 1:
+ if node.reservation is None:
raise exception.NodeNotLocked(node=node.uuid)
else:
raise exception.NodeLocked(node=node.uuid,
host=node['reservation'])
+ session.flush()
except NoResultFound:
raise exception.NodeNotFound(node=node_id)
@@ -677,47 +778,68 @@ class Connection(api.Connection):
node = models.Node()
node.update(values)
- with _session_for_write() as session:
- try:
+ try:
+ with _session_for_write() as session:
session.add(node)
+ # Set tags & traits to [] for new created node
+ # NOTE(mgoddard): We need to set the tags and traits fields in
+ # the session context, otherwise SQLAlchemy will try and fail
+ # to lazy load the attributes, resulting in an exception being
+ # raised.
+ node['tags'] = []
+ node['traits'] = []
session.flush()
- except db_exc.DBDuplicateEntry as exc:
- if 'name' in exc.columns:
- raise exception.DuplicateName(name=values['name'])
- elif 'instance_uuid' in exc.columns:
- raise exception.InstanceAssociated(
- instance_uuid=values['instance_uuid'],
- node=values['uuid'])
- raise exception.NodeAlreadyExists(uuid=values['uuid'])
- # Set tags & traits to [] for new created node
- # NOTE(mgoddard): We need to set the tags and traits fields in the
- # session context, otherwise SQLAlchemy will try and fail to lazy
- # load the attributes, resulting in an exception being raised.
- node['tags'] = []
- node['traits'] = []
+ except db_exc.DBDuplicateEntry as exc:
+ if 'name' in exc.columns:
+ raise exception.DuplicateName(name=values['name'])
+ elif 'instance_uuid' in exc.columns:
+ raise exception.InstanceAssociated(
+ instance_uuid=values['instance_uuid'],
+ node=values['uuid'])
+ raise exception.NodeAlreadyExists(uuid=values['uuid'])
return node
+ def _get_node_by_id_no_joins(self, node_id):
+ # TODO(TheJulia): Maybe replace with this with a minimal
+ # "get these three fields" thing.
+ try:
+ with _session_for_read() as session:
+ # Explicitly load NodeBase as the invocation of the
+ # priamary model object reesults in the join query
+ # triggering.
+ return session.execute(
+ sa.select(models.NodeBase).filter_by(id=node_id).limit(1)
+ ).scalars().first()
+ except NoResultFound:
+ raise exception.NodeNotFound(node=node_id)
+
def get_node_by_id(self, node_id):
- query = _get_node_query_with_all_for_single_node()
- query = query.filter_by(id=node_id)
try:
- return query.one()
+ query = _get_node_select()
+ with _session_for_read() as session:
+ return session.scalars(
+ query.filter_by(id=node_id).limit(1)
+ ).unique().one()
except NoResultFound:
raise exception.NodeNotFound(node=node_id)
def get_node_by_uuid(self, node_uuid):
- query = _get_node_query_with_all_for_single_node()
- query = query.filter_by(uuid=node_uuid)
try:
- return query.one()
+ query = _get_node_select()
+ with _session_for_read() as session:
+ return session.scalars(
+ query.filter_by(uuid=node_uuid).limit(1)
+ ).unique().one()
except NoResultFound:
raise exception.NodeNotFound(node=node_uuid)
def get_node_by_name(self, node_name):
- query = _get_node_query_with_all_for_single_node()
- query = query.filter_by(name=node_name)
try:
- return query.one()
+ query = _get_node_select()
+ with _session_for_read() as session:
+ return session.scalars(
+ query.filter_by(name=node_name).limit(1)
+ ).unique().one()
except NoResultFound:
raise exception.NodeNotFound(node=node_name)
@@ -725,15 +847,14 @@ class Connection(api.Connection):
if not uuidutils.is_uuid_like(instance):
raise exception.InvalidUUID(uuid=instance)
- query = _get_node_query_with_all_for_single_node()
- query = query.filter_by(instance_uuid=instance)
-
try:
- result = query.one()
+ query = _get_node_select()
+ with _session_for_read() as session:
+ return session.scalars(
+ query.filter_by(instance_uuid=instance).limit(1)
+ ).unique().one()
except NoResultFound:
- raise exception.InstanceNotFound(instance=instance)
-
- return result
+ raise exception.InstanceNotFound(instance_uuid=instance)
@oslo_db_api.retry_on_deadlock
def destroy_node(self, node_id):
@@ -846,10 +967,14 @@ class Connection(api.Connection):
ref.update(values)
- # Return the updated node model joined with all relevant fields.
- query = _get_node_query_with_all_for_single_node()
- query = add_identity_filter(query, node_id)
- return query.one()
+ # Return the updated node model joined with all relevant fields.
+ query = _get_node_select()
+ query = add_identity_filter(query, node_id)
+ # FIXME(TheJulia): This entire method needs to be re-written to
+ # use the proper execution format for SQLAlchemy 2.0. Likely
+ # A query, independent update, and a re-query on the transaction.
+ with _session_for_read() as session:
+ return session.execute(query).one()[0]
def get_port_by_id(self, port_id):
query = model_query(models.Port).filter_by(id=port_id)
@@ -886,7 +1011,7 @@ class Connection(api.Connection):
def get_port_list(self, limit=None, marker=None,
sort_key=None, sort_dir=None, owner=None,
project=None):
- query = model_query(models.Port)
+ query = sa.select(models.Port)
if owner:
query = add_port_filter_by_node_owner(query, owner)
elif project:
@@ -897,8 +1022,7 @@ class Connection(api.Connection):
def get_ports_by_node_id(self, node_id, limit=None, marker=None,
sort_key=None, sort_dir=None, owner=None,
project=None):
- query = model_query(models.Port)
- query = query.filter_by(node_id=node_id)
+ query = sa.select(models.Port).where(models.Port.node_id == node_id)
if owner:
query = add_port_filter_by_node_owner(query, owner)
elif project:
@@ -909,8 +1033,10 @@ class Connection(api.Connection):
def get_ports_by_portgroup_id(self, portgroup_id, limit=None, marker=None,
sort_key=None, sort_dir=None, owner=None,
project=None):
- query = model_query(models.Port)
- query = query.filter_by(portgroup_id=portgroup_id)
+ query = sa.select(models.Port).where(
+ models.Port.portgroup_id == portgroup_id
+ )
+
if owner:
query = add_port_filter_by_node_owner(query, owner)
elif project:
@@ -925,15 +1051,15 @@ class Connection(api.Connection):
port = models.Port()
port.update(values)
- with _session_for_write() as session:
- try:
+ try:
+ with _session_for_write() as session:
session.add(port)
session.flush()
- except db_exc.DBDuplicateEntry as exc:
- if 'address' in exc.columns:
- raise exception.MACAlreadyExists(mac=values['address'])
- raise exception.PortAlreadyExists(uuid=values['uuid'])
- return port
+ except db_exc.DBDuplicateEntry as exc:
+ if 'address' in exc.columns:
+ raise exception.MACAlreadyExists(mac=values['address'])
+ raise exception.PortAlreadyExists(uuid=values['uuid'])
+ return port
@oslo_db_api.retry_on_deadlock
def update_port(self, port_id, values):
@@ -941,7 +1067,6 @@ class Connection(api.Connection):
if 'uuid' in values:
msg = _("Cannot overwrite UUID for an existing Port.")
raise exception.InvalidParameterValue(err=msg)
-
try:
with _session_for_write() as session:
query = model_query(models.Port)
@@ -1010,7 +1135,7 @@ class Connection(api.Connection):
def get_portgroups_by_node_id(self, node_id, limit=None, marker=None,
sort_key=None, sort_dir=None, project=None):
query = model_query(models.Portgroup)
- query = query.filter_by(node_id=node_id)
+ query = query.where(models.Portgroup.node_id == node_id)
if project:
query = add_portgroup_filter_by_node_project(query, project)
return _paginate_query(models.Portgroup, limit, marker,
@@ -1067,34 +1192,40 @@ class Connection(api.Connection):
def destroy_portgroup(self, portgroup_id):
def portgroup_not_empty(session):
"""Checks whether the portgroup does not have ports."""
-
- query = model_query(models.Port)
- query = add_port_filter_by_portgroup(query, portgroup_id)
-
- return query.count() != 0
+ with _session_for_read() as session:
+ return session.scalar(
+ sa.select(
+ sa.func.count(models.Port.id)
+ ).where(models.Port.portgroup_id == portgroup_id)) != 0
with _session_for_write() as session:
if portgroup_not_empty(session):
raise exception.PortgroupNotEmpty(portgroup=portgroup_id)
- query = model_query(models.Portgroup, session=session)
- query = add_identity_filter(query, portgroup_id)
+ query = sa.delete(models.Portgroup)
+ query = add_identity_where(query, models.Portgroup, portgroup_id)
- count = query.delete()
+ count = session.execute(query).rowcount
if count == 0:
raise exception.PortgroupNotFound(portgroup=portgroup_id)
def get_chassis_by_id(self, chassis_id):
- query = model_query(models.Chassis).filter_by(id=chassis_id)
+ query = sa.select(models.Chassis).where(
+ models.Chassis.id == chassis_id)
+
try:
- return query.one()
+ with _session_for_read() as session:
+ return session.execute(query).one()[0]
except NoResultFound:
raise exception.ChassisNotFound(chassis=chassis_id)
def get_chassis_by_uuid(self, chassis_uuid):
- query = model_query(models.Chassis).filter_by(uuid=chassis_uuid)
+ query = sa.select(models.Chassis).where(
+ models.Chassis.uuid == chassis_uuid)
+
try:
- return query.one()
+ with _session_for_read() as session:
+ return session.execute(query).one()[0]
except NoResultFound:
raise exception.ChassisNotFound(chassis=chassis_uuid)
@@ -1110,13 +1241,13 @@ class Connection(api.Connection):
chassis = models.Chassis()
chassis.update(values)
- with _session_for_write() as session:
- try:
+ try:
+ with _session_for_write() as session:
session.add(chassis)
session.flush()
- except db_exc.DBDuplicateEntry:
- raise exception.ChassisAlreadyExists(uuid=values['uuid'])
- return chassis
+ except db_exc.DBDuplicateEntry:
+ raise exception.ChassisAlreadyExists(uuid=values['uuid'])
+ return chassis
@oslo_db_api.retry_on_deadlock
def update_chassis(self, chassis_id, values):
@@ -1127,7 +1258,7 @@ class Connection(api.Connection):
with _session_for_write():
query = model_query(models.Chassis)
- query = add_identity_filter(query, chassis_id)
+ query = add_identity_where(query, models.Chassis, chassis_id)
count = query.update(values)
if count != 1:
@@ -1183,27 +1314,32 @@ class Connection(api.Connection):
def get_conductor(self, hostname, online=True):
try:
- query = model_query(models.Conductor).filter_by(hostname=hostname)
+ query = sa.select(models.Conductor).where(
+ models.Conductor.hostname == hostname)
if online is not None:
- query = query.filter_by(online=online)
- return query.one()
+ query = query.where(models.Conductor.online == online)
+ with _session_for_read() as session:
+ res = session.execute(query).one()[0]
+ return res
except NoResultFound:
raise exception.ConductorNotFound(conductor=hostname)
@oslo_db_api.retry_on_deadlock
def unregister_conductor(self, hostname):
- with _session_for_write():
- query = (model_query(models.Conductor)
- .filter_by(hostname=hostname, online=True))
- count = query.update({'online': False})
+ with _session_for_write() as session:
+ query = sa.update(models.Conductor).where(
+ models.Conductor.hostname == hostname,
+ models.Conductor.online == True).values( # noqa
+ online=False)
+ count = session.execute(query).rowcount
if count == 0:
raise exception.ConductorNotFound(conductor=hostname)
@oslo_db_api.retry_on_deadlock
def touch_conductor(self, hostname):
with _session_for_write():
- query = (model_query(models.Conductor)
- .filter_by(hostname=hostname))
+ query = model_query(models.Conductor)
+ query = query.where(models.Conductor.hostname == hostname)
# since we're not changing any other field, manually set updated_at
# and since we're heartbeating, make sure that online=True
count = query.update({'updated_at': timeutils.utcnow(),
@@ -1278,7 +1414,7 @@ class Connection(api.Connection):
def list_conductor_hardware_interfaces(self, conductor_id):
query = (model_query(models.ConductorHardwareInterfaces)
- .filter_by(conductor_id=conductor_id))
+ .where(models.ConductorHardwareInterfaces.conductor_id == conductor_id)) # noqa
return query.all()
def list_hardware_type_interfaces(self, hardware_types):
@@ -1293,6 +1429,13 @@ class Connection(api.Connection):
def register_conductor_hardware_interfaces(self, conductor_id, interfaces):
with _session_for_write() as session:
try:
+ try:
+ session.begin()
+ except sa.exc.InvalidRequestError:
+ # When running unit tests, the transaction reports as
+ # already started, where as in service startup this is
+ # the first write op.
+ pass
for iface in interfaces:
conductor_hw_iface = models.ConductorHardwareInterfaces()
conductor_hw_iface['conductor_id'] = conductor_id
@@ -1322,7 +1465,8 @@ class Connection(api.Connection):
raise exception.NodeNotFound(node=node_id)
def _check_node_exists(self, node_id):
- if not model_query(models.Node).filter_by(id=node_id).scalar():
+ if not model_query(models.Node).where(
+ models.Node.id == node_id).scalar():
raise exception.NodeNotFound(node=node_id)
@oslo_db_api.retry_on_deadlock
@@ -1383,12 +1527,17 @@ class Connection(api.Connection):
return model_query(q.exists()).scalar()
def get_node_by_port_addresses(self, addresses):
- q = _get_node_query_with_all_for_single_node()
+ q = _get_node_select()
q = q.distinct().join(models.Port)
q = q.filter(models.Port.address.in_(addresses))
try:
- return q.one()
+ # FIXME(TheJulia): This needs to be updated to be
+ # an explicit query to identify the node for SQLAlchemy.
+ with _session_for_read() as session:
+ # Always return the first element, since we always
+ # get a tuple from sqlalchemy.
+ return session.execute(q).one()[0]
except NoResultFound:
raise exception.NodeNotFound(
_('Node with port addresses %s was not found')
@@ -1424,7 +1573,8 @@ class Connection(api.Connection):
def get_volume_connectors_by_node_id(self, node_id, limit=None,
marker=None, sort_key=None,
sort_dir=None, project=None):
- query = model_query(models.VolumeConnector).filter_by(node_id=node_id)
+ query = model_query(models.VolumeConnector).where(
+ models.VolumeConnector.node_id == node_id)
if project:
add_volume_conn_filter_by_node_project(query, project)
return _paginate_query(models.VolumeConnector, limit, marker,
@@ -1492,7 +1642,8 @@ class Connection(api.Connection):
sort_key, sort_dir, query)
def get_volume_target_by_id(self, db_id):
- query = model_query(models.VolumeTarget).filter_by(id=db_id)
+ query = model_query(models.VolumeTarget).where(
+ models.VolumeTarget.id == db_id)
try:
return query.one()
except NoResultFound:
@@ -1517,7 +1668,8 @@ class Connection(api.Connection):
def get_volume_targets_by_volume_id(self, volume_id, limit=None,
marker=None, sort_key=None,
sort_dir=None, project=None):
- query = model_query(models.VolumeTarget).filter_by(volume_id=volume_id)
+ query = model_query(models.VolumeTarget).where(
+ models.VolumeTarget.volume_id == volume_id)
if project:
query = add_volume_target_filter_by_node_project(query, project)
return _paginate_query(models.VolumeTarget, limit, marker, sort_key,
@@ -1586,6 +1738,8 @@ class Connection(api.Connection):
if not versions:
return []
+ if model_name == 'Node':
+ model_name = 'NodeBase'
model = models.get_class(model_name)
# NOTE(rloo): .notin_ does not handle null:
@@ -1614,7 +1768,11 @@ class Connection(api.Connection):
"""
object_versions = release_mappings.get_object_versions()
table_missing_ok = False
- for model in models.Base.__subclasses__():
+ models_to_check = models.Base.__subclasses__()
+ # We need to append Node to the list as it is a subclass of
+ # NodeBase, which is intentional to delineate excess queries.
+ models_to_check.append(models.Node)
+ for model in models_to_check:
if model.__name__ not in object_versions:
continue
@@ -1688,8 +1846,9 @@ class Connection(api.Connection):
mapping = release_mappings.RELEASE_MAPPING['master']['objects']
total_to_migrate = 0
total_migrated = 0
-
- sql_models = [model for model in models.Base.__subclasses__()
+ all_models = models.Base.__subclasses__()
+ all_models.append(models.Node)
+ sql_models = [model for model in all_models
if model.__name__ in mapping]
for model in sql_models:
version = mapping[model.__name__][0]
@@ -2221,29 +2380,29 @@ class Connection(api.Connection):
# this does not work with PostgreSQL.
query = model_query(models.DeployTemplate)
query = add_identity_filter(query, template_id)
- try:
- ref = query.with_for_update().one()
- except NoResultFound:
- raise exception.DeployTemplateNotFound(
- template=template_id)
-
+ ref = query.with_for_update().one()
# First, update non-step columns.
steps = values.pop('steps', None)
ref.update(values)
-
# If necessary, update steps.
if steps is not None:
self._update_deploy_template_steps(session, ref.id, steps)
+ session.flush()
+ with _session_for_read() as session:
# Return the updated template joined with all relevant fields.
- query = _get_deploy_template_query_with_steps()
+ query = _get_deploy_template_select_with_steps()
query = add_identity_filter(query, template_id)
- return query.one()
+ return session.execute(query).one()[0]
except db_exc.DBDuplicateEntry as e:
if 'name' in e.columns:
raise exception.DeployTemplateDuplicateName(
name=values['name'])
raise
+ except NoResultFound:
+ # TODO(TheJulia): What would unified core raise?!?
+ raise exception.DeployTemplateNotFound(
+ template=template_id)
@oslo_db_api.retry_on_deadlock
def destroy_deploy_template(self, template_id):
@@ -2257,21 +2416,26 @@ class Connection(api.Connection):
def _get_deploy_template(self, field, value):
"""Helper method for retrieving a deploy template."""
- query = (_get_deploy_template_query_with_steps()
- .filter_by(**{field: value}))
+ query = (_get_deploy_template_select_with_steps()
+ .where(field == value))
try:
- return query.one()
+ # FIXME(TheJulia): This needs to be fixed for SQLAlchemy 2.0
+ with _session_for_read() as session:
+ return session.execute(query).one()[0]
except NoResultFound:
raise exception.DeployTemplateNotFound(template=value)
def get_deploy_template_by_id(self, template_id):
- return self._get_deploy_template('id', template_id)
+ return self._get_deploy_template(models.DeployTemplate.id,
+ template_id)
def get_deploy_template_by_uuid(self, template_uuid):
- return self._get_deploy_template('uuid', template_uuid)
+ return self._get_deploy_template(models.DeployTemplate.uuid,
+ template_uuid)
def get_deploy_template_by_name(self, template_name):
- return self._get_deploy_template('name', template_name)
+ return self._get_deploy_template(models.DeployTemplate.name,
+ template_name)
def get_deploy_template_list(self, limit=None, marker=None,
sort_key=None, sort_dir=None):
@@ -2280,9 +2444,14 @@ class Connection(api.Connection):
sort_key, sort_dir, query)
def get_deploy_template_list_by_names(self, names):
- query = (_get_deploy_template_query_with_steps()
- .filter(models.DeployTemplate.name.in_(names)))
- return query.all()
+ query = _get_deploy_template_select_with_steps()
+ with _session_for_read() as session:
+ res = session.execute(
+ query.where(
+ models.DeployTemplate.name.in_(names)
+ )
+ ).all()
+ return [r[0] for r in res]
@oslo_db_api.retry_on_deadlock
def create_node_history(self, values):
@@ -2329,7 +2498,7 @@ class Connection(api.Connection):
def get_node_history_by_node_id(self, node_id, limit=None, marker=None,
sort_key=None, sort_dir=None):
query = model_query(models.NodeHistory)
- query = query.filter_by(node_id=node_id)
+ query = query.where(models.NodeHistory.node_id == node_id)
return _paginate_query(models.NodeHistory, limit, marker,
sort_key, sort_dir, query)
@@ -2396,6 +2565,9 @@ class Connection(api.Connection):
# Uses input entry list, selects entries matching those ids
# then deletes them and does not synchronize the session so
# sqlalchemy doesn't do extra un-necessary work.
+ # NOTE(TheJulia): This is "legacy" syntax, but it is still
+ # valid and under the hood SQLAlchemy rewrites the form into
+ # a delete syntax.
session.query(
models.NodeHistory
).filter(
@@ -2414,13 +2586,12 @@ class Connection(api.Connection):
# literally have the DB do *all* of the world, so no
# client side ops occur. The column is also indexed,
# which means this will be an index based response.
- # TODO(TheJulia): This might need to be revised for
- # SQLAlchemy 2.0 as it should be a scaler select and count
- # instead.
- return session.query(
- models.Node.provision_state
- ).filter(
- or_(
- models.Node.provision_state == v for v in state
+ return session.scalar(
+ sa.select(
+ sa.func.count(models.Node.id)
+ ).filter(
+ or_(
+ models.Node.provision_state == v for v in state
+ )
)
- ).count()
+ )