diff options
Diffstat (limited to 'ironic/db/sqlalchemy/api.py')
-rw-r--r-- | ironic/db/sqlalchemy/api.py | 549 |
1 files changed, 360 insertions, 189 deletions
diff --git a/ironic/db/sqlalchemy/api.py b/ironic/db/sqlalchemy/api.py index c14719af8..b05af3637 100644 --- a/ironic/db/sqlalchemy/api.py +++ b/ironic/db/sqlalchemy/api.py @@ -19,9 +19,11 @@ import datetime import json import threading +from oslo_concurrency import lockutils from oslo_db import api as oslo_db_api from oslo_db import exception as db_exc from oslo_db.sqlalchemy import enginefacade +from oslo_db.sqlalchemy import orm as sa_orm from oslo_db.sqlalchemy import utils as db_utils from oslo_log import log from oslo_utils import netutils @@ -31,7 +33,7 @@ from oslo_utils import uuidutils from osprofiler import sqlalchemy as osp_sqlalchemy import sqlalchemy as sa from sqlalchemy import or_ -from sqlalchemy.orm.exc import NoResultFound, MultipleResultsFound +from sqlalchemy.exc import NoResultFound, MultipleResultsFound from sqlalchemy.orm import joinedload from sqlalchemy.orm import Load from sqlalchemy.orm import selectinload @@ -53,6 +55,10 @@ LOG = log.getLogger(__name__) _CONTEXT = threading.local() + +RESERVATION_SEMAPHORE = "reserve_node_db_lock" +synchronized = lockutils.synchronized_with_prefix('ironic-') + # NOTE(mgoddard): We limit the number of traits per node to 50 as this is the # maximum number of traits per resource provider allowed in placement. MAX_TRAITS_PER_NODE = 50 @@ -83,6 +89,11 @@ def _wrap_session(session): def _get_node_query_with_all_for_single_node(): """Return a query object for the Node joined with all relevant fields. + Deprecated: This method, while useful, returns a "Legacy Query" object + which, while useful is considered a legacy object from SQLAlchemy + which at some point may be removed. SQLAlchemy encourages all users + to move to the unified ORM/Core Select interface. + This method utilizes a joined load query which creates a result set where corresponding traits, and tags, are joined together in the result set. @@ -109,9 +120,10 @@ def _get_node_query_with_all_for_single_node(): Where joins are super in-efficent for Ironic, is where nodes are being enumerated, as the above result set pattern is not just for one node, but - potentially thousands of nodes. In that case, we should use the - _get_node_query_with_all_for_list helper to return a more appropriate - query object which will be more efficient for the end user. + potentially thousands of nodes. Please consider using _get_node_select + which results in a primary query for the nodes, and then performs + additional targeted queries for the joined tables, as opposed to + performing client side de-duplication. :returns: a query object. """ @@ -127,49 +139,47 @@ def _get_node_query_with_all_for_single_node(): # 2.43 seconds to obtain all nodes from SQLAlchemy (10k nodes) # 5.15 seconds to obtain all nodes *and* have node objects (10k nodes) return (model_query(models.Node) - .options(joinedload('tags')) - .options(joinedload('traits'))) + .options(joinedload(models.Node.tags)) + .options(joinedload(models.Node.traits))) -def _get_node_query_with_all_for_list(): - """Return a query object for the Node with queried extra fields. +def _get_node_select(): + """Returns a SQLAlchemy Select Object for Nodes. - This method returns a query object joining tags and traits in a pattern - where the result set is first built, and then the resulting associations - are queried separately and the objects are reconciled by SQLAlchemy to - build the composite objects based upon the associations. + This method returns a pre-formatted select object which models + the entire Node object, allowing callers to operate on a node like + they would have with an SQLAlchemy ORM Query Object. - This results in the following query pattern when the query is executed: + This object *also* performs two additional select queries, in the form + of a selectin operation, to achieve the same results of a Join query, + but without the join query itself, and the client side load. - select $fields from nodes where x; - # SQLAlchemy creates a list of associated node IDs. - select $fields from tags where node_id in ('1', '3', '37268'); - select $fields from traits where node_id in ('1', '3', '37268'); + This method is best utilized when retrieving lists of nodes. - SQLAlchemy then returns a result set where the tags and traits are - composited together efficently as opposed to having to deduplicate - the result set. This shifts additional load to the database which - was previously a high overhead operation with-in the conductor... - which results in a slower conductor. + Select objects in this fashion were added as a result of SQLAlchemy 1.4 + in preparation for SQLAlchemy 2.0's release to provide a unified + select interface. - :returns: a query object. + :returns: a select object """ - # NOTE(TheJulia): When comparing CI rubs *with* this being the default - # for all general list operations, at 10k nodes, this pattern appears - # to be on-par with a 5% variability between the two example benchmark - # tests. That being said, the test *does* not include tags or traits - # in it's test data set so client side deduplication is not measured. - # NOTE(TheJulia): Basic benchmark difference - # tests data creation: 67.117 seconds - # 2.32 seconds to obtain all nodes from SQLAlchemy (10k nodes) - # 4.99 seconds to obtain all nodes *and* have node objects (10k nodes) - # If this holds true, the required record deduplication with joinedload - # may be basically the same amount of overhead as requesting the tags - # and traits separately. - return (model_query(models.Node) - .options(selectinload('tags')) - .options(selectinload('traits'))) + # NOTE(TheJulia): This returns a query in the SQLAlchemy 1.4->2.0 + # migration style as query model loading is deprecated. + + # This must use selectinload to avoid later need to invokededuplication. + return (sa.select(models.Node) + .options(selectinload(models.Node.tags), + selectinload(models.Node.traits))) + + +def _get_deploy_template_select_with_steps(): + """Return a select object for the DeployTemplate joined with steps. + + :returns: a select object. + """ + return sa.select( + models.DeployTemplate + ).options(selectinload(models.DeployTemplate.steps)) def _get_deploy_template_query_with_steps(): @@ -177,7 +187,8 @@ def _get_deploy_template_query_with_steps(): :returns: a query object. """ - return model_query(models.DeployTemplate).options(joinedload('steps')) + return model_query(models.DeployTemplate).options( + selectinload(models.DeployTemplate.steps)) def model_query(model, *args, **kwargs): @@ -209,6 +220,26 @@ def add_identity_filter(query, value): raise exception.InvalidIdentity(identity=value) +def add_identity_where(op, model, value): + """Adds an identity filter to operation for where method. + + Filters results by ID, if supplied value is a valid integer. + Otherwise attempts to filter results by UUID. + + :param op: Initial operation to add filter to. + i.e. a update or delete statement. + :param model: The SQLAlchemy model to apply. + :param value: Value for filtering results by. + :return: Modified query. + """ + if strutils.is_int_like(value): + return op.where(model.id == value) + elif uuidutils.is_uuid_like(value): + return op.where(model.uuid == value) + else: + raise exception.InvalidIdentity(identity=value) + + def add_port_filter(query, value): """Adds a port-specific filter to a query. @@ -281,7 +312,7 @@ def add_portgroup_filter(query, value): if netutils.is_valid_mac(value): return query.filter_by(address=value) else: - return add_identity_filter(query, value) + return add_identity_where(query, models.Portgroup, value) def add_portgroup_filter_by_node(query, value): @@ -332,8 +363,10 @@ def add_allocation_filter_by_conductor(query, value): def _paginate_query(model, limit=None, marker=None, sort_key=None, - sort_dir=None, query=None): - if not query: + sort_dir=None, query=None, return_base_tuple=False): + # NOTE(TheJulia): We can't just ask for the bool of query if it is + # populated, so we need to ask if it is None. + if query is None: query = model_query(model) sort_keys = ['id'] if sort_key and sort_key not in sort_keys: @@ -345,7 +378,28 @@ def _paginate_query(model, limit=None, marker=None, sort_key=None, raise exception.InvalidParameterValue( _('The sort_key value "%(key)s" is an invalid field for sorting') % {'key': sort_key}) - return query.all() + with _session_for_read() as session: + # NOTE(TheJulia): SQLAlchemy 2.0 no longer returns pre-uniqued result + # sets in ORM mode, so we need to explicitly ask for it to be unique + # before returning it to the caller. + if isinstance(query, sa_orm.Query): + # The classic "Legacy" ORM query object result set which is + # deprecated in advance of SQLAlchemy 2.0. + return query.all() + else: + # In this case, we have a sqlalchemy.sql.selectable.Select + # (most likely) which utilizes the unified select interface. + res = session.execute(query).fetchall() + if len(res) == 0: + # Return an empty list instead of a class with no objects. + return [] + if return_base_tuple: + # The caller expects a tuple, lets just give it to them. + return res + # Everything is a tuple in a resultset from the unified interface + # but for objects, our model expects just object access, + # so we extract and return them. + return [r[0] for r in res] def _filter_active_conductors(query, interval=None): @@ -514,15 +568,16 @@ class Connection(api.Connection): else: columns = [getattr(models.Node, c) for c in columns] - query = model_query(*columns, base_model=models.Node) + query = sa.select(*columns) query = self._add_nodes_filters(query, filters) return _paginate_query(models.Node, limit, marker, - sort_key, sort_dir, query) + sort_key, sort_dir, query, + return_base_tuple=True) def get_node_list(self, filters=None, limit=None, marker=None, sort_key=None, sort_dir=None, fields=None): if not fields: - query = _get_node_query_with_all_for_list() + query = _get_node_select() query = self._add_nodes_filters(query, filters) return _paginate_query(models.Node, limit, marker, sort_key, sort_dir, query) @@ -559,24 +614,25 @@ class Connection(api.Connection): # with SQLAlchemy. traits_found = True use_columns.remove('traits') - # Generate the column object list so SQLAlchemy only fulfills # the requested columns. use_columns = [getattr(models.Node, c) for c in use_columns] - # In essence, traits (and anything else needed to generate the # composite objects) need to be reconciled without using a join # as multiple rows can be generated in the result set being returned # from the database server. In this case, with traits, we use # a selectinload pattern. if traits_found: - query = model_query(models.Node).options( - Load(models.Node).load_only(*use_columns), - selectinload(models.Node.traits)) + query = sa.select(models.Node).options( + selectinload(models.Node.traits), + Load(models.Node).load_only(*use_columns) + ) else: - query = model_query(models.Node).options( - Load(models.Node).load_only(*use_columns)) - + # Note for others, if you ask for a whole model, it is + # modeled, i.e. you can access it as an object. + query = sa.select(models.NodeBase).options( + Load(models.Node).load_only(*use_columns) + ) query = self._add_nodes_filters(query, filters) return _paginate_query(models.Node, limit, marker, sort_key, sort_dir, query) @@ -618,40 +674,85 @@ class Connection(api.Connection): return mapping + @synchronized(RESERVATION_SEMAPHORE, fair=True) + def _reserve_node_place_lock(self, tag, node_id, node): + try: + # NOTE(TheJulia): We explicitly do *not* synch the session + # so the other actions in the conductor do not become aware + # that the lock is in place and believe they hold the lock. + # This necessitates an overall lock in the code side, so + # we avoid conditions where two separate threads can believe + # they hold locks at the same time. + with _session_for_write() as session: + res = session.execute( + sa.update(models.Node). + where(models.Node.id == node.id). + where(models.Node.reservation == None). # noqa + values(reservation=tag). + execution_options(synchronize_session=False)) + session.flush() + node = self._get_node_by_id_no_joins(node.id) + # NOTE(TheJulia): In SQLAlchemy 2.0 style, we don't + # magically get a changed node as they moved from the + # many ways to do things to singular ways to do things. + if res.rowcount != 1: + # Nothing updated and node exists. Must already be + # locked. + raise exception.NodeLocked(node=node.uuid, + host=node.reservation) + except NoResultFound: + # In the event that someone has deleted the node on + # another thread. + raise exception.NodeNotFound(node=node_id) + @oslo_db_api.retry_on_deadlock def reserve_node(self, tag, node_id): - with _session_for_write(): - query = _get_node_query_with_all_for_single_node() - query = add_identity_filter(query, node_id) - count = query.filter_by(reservation=None).update( - {'reservation': tag}, synchronize_session=False) + with _session_for_read(): try: + # TODO(TheJulia): Figure out a good way to query + # this so that we do it as light as possible without + # the full object invocation, which will speed lock + # activities. Granted, this is all at the DB level + # so maybe that is okay in the grand scheme of things. + query = model_query(models.Node) + query = add_identity_filter(query, node_id) node = query.one() - if count != 1: - # Nothing updated and node exists. Must already be - # locked. - raise exception.NodeLocked(node=node.uuid, - host=node['reservation']) - return node except NoResultFound: raise exception.NodeNotFound(node=node_id) + if node.reservation: + # Fail fast, instead of attempt the update. + raise exception.NodeLocked(node=node.uuid, + host=node.reservation) + self._reserve_node_place_lock(tag, node_id, node) + # Return a node object as that is the contract for this method. + return self.get_node_by_id(node.id) @oslo_db_api.retry_on_deadlock def release_node(self, tag, node_id): - with _session_for_write(): - query = model_query(models.Node) - query = add_identity_filter(query, node_id) - # be optimistic and assume we usually release a reservation - count = query.filter_by(reservation=tag).update( - {'reservation': None}, synchronize_session=False) + with _session_for_read(): try: - if count != 1: - node = query.one() - if node['reservation'] is None: + query = model_query(models.Node) + query = add_identity_filter(query, node_id) + node = query.one() + except NoResultFound: + raise exception.NodeNotFound(node=node_id) + with _session_for_write() as session: + try: + res = session.execute( + sa.update(models.Node). + where(models.Node.id == node.id). + where(models.Node.reservation == tag). + values(reservation=None). + execution_options(synchronize_session=False) + ) + node = self.get_node_by_id(node.id) + if res.rowcount != 1: + if node.reservation is None: raise exception.NodeNotLocked(node=node.uuid) else: raise exception.NodeLocked(node=node.uuid, host=node['reservation']) + session.flush() except NoResultFound: raise exception.NodeNotFound(node=node_id) @@ -677,47 +778,68 @@ class Connection(api.Connection): node = models.Node() node.update(values) - with _session_for_write() as session: - try: + try: + with _session_for_write() as session: session.add(node) + # Set tags & traits to [] for new created node + # NOTE(mgoddard): We need to set the tags and traits fields in + # the session context, otherwise SQLAlchemy will try and fail + # to lazy load the attributes, resulting in an exception being + # raised. + node['tags'] = [] + node['traits'] = [] session.flush() - except db_exc.DBDuplicateEntry as exc: - if 'name' in exc.columns: - raise exception.DuplicateName(name=values['name']) - elif 'instance_uuid' in exc.columns: - raise exception.InstanceAssociated( - instance_uuid=values['instance_uuid'], - node=values['uuid']) - raise exception.NodeAlreadyExists(uuid=values['uuid']) - # Set tags & traits to [] for new created node - # NOTE(mgoddard): We need to set the tags and traits fields in the - # session context, otherwise SQLAlchemy will try and fail to lazy - # load the attributes, resulting in an exception being raised. - node['tags'] = [] - node['traits'] = [] + except db_exc.DBDuplicateEntry as exc: + if 'name' in exc.columns: + raise exception.DuplicateName(name=values['name']) + elif 'instance_uuid' in exc.columns: + raise exception.InstanceAssociated( + instance_uuid=values['instance_uuid'], + node=values['uuid']) + raise exception.NodeAlreadyExists(uuid=values['uuid']) return node + def _get_node_by_id_no_joins(self, node_id): + # TODO(TheJulia): Maybe replace with this with a minimal + # "get these three fields" thing. + try: + with _session_for_read() as session: + # Explicitly load NodeBase as the invocation of the + # priamary model object reesults in the join query + # triggering. + return session.execute( + sa.select(models.NodeBase).filter_by(id=node_id).limit(1) + ).scalars().first() + except NoResultFound: + raise exception.NodeNotFound(node=node_id) + def get_node_by_id(self, node_id): - query = _get_node_query_with_all_for_single_node() - query = query.filter_by(id=node_id) try: - return query.one() + query = _get_node_select() + with _session_for_read() as session: + return session.scalars( + query.filter_by(id=node_id).limit(1) + ).unique().one() except NoResultFound: raise exception.NodeNotFound(node=node_id) def get_node_by_uuid(self, node_uuid): - query = _get_node_query_with_all_for_single_node() - query = query.filter_by(uuid=node_uuid) try: - return query.one() + query = _get_node_select() + with _session_for_read() as session: + return session.scalars( + query.filter_by(uuid=node_uuid).limit(1) + ).unique().one() except NoResultFound: raise exception.NodeNotFound(node=node_uuid) def get_node_by_name(self, node_name): - query = _get_node_query_with_all_for_single_node() - query = query.filter_by(name=node_name) try: - return query.one() + query = _get_node_select() + with _session_for_read() as session: + return session.scalars( + query.filter_by(name=node_name).limit(1) + ).unique().one() except NoResultFound: raise exception.NodeNotFound(node=node_name) @@ -725,15 +847,14 @@ class Connection(api.Connection): if not uuidutils.is_uuid_like(instance): raise exception.InvalidUUID(uuid=instance) - query = _get_node_query_with_all_for_single_node() - query = query.filter_by(instance_uuid=instance) - try: - result = query.one() + query = _get_node_select() + with _session_for_read() as session: + return session.scalars( + query.filter_by(instance_uuid=instance).limit(1) + ).unique().one() except NoResultFound: - raise exception.InstanceNotFound(instance=instance) - - return result + raise exception.InstanceNotFound(instance_uuid=instance) @oslo_db_api.retry_on_deadlock def destroy_node(self, node_id): @@ -846,10 +967,14 @@ class Connection(api.Connection): ref.update(values) - # Return the updated node model joined with all relevant fields. - query = _get_node_query_with_all_for_single_node() - query = add_identity_filter(query, node_id) - return query.one() + # Return the updated node model joined with all relevant fields. + query = _get_node_select() + query = add_identity_filter(query, node_id) + # FIXME(TheJulia): This entire method needs to be re-written to + # use the proper execution format for SQLAlchemy 2.0. Likely + # A query, independent update, and a re-query on the transaction. + with _session_for_read() as session: + return session.execute(query).one()[0] def get_port_by_id(self, port_id): query = model_query(models.Port).filter_by(id=port_id) @@ -886,7 +1011,7 @@ class Connection(api.Connection): def get_port_list(self, limit=None, marker=None, sort_key=None, sort_dir=None, owner=None, project=None): - query = model_query(models.Port) + query = sa.select(models.Port) if owner: query = add_port_filter_by_node_owner(query, owner) elif project: @@ -897,8 +1022,7 @@ class Connection(api.Connection): def get_ports_by_node_id(self, node_id, limit=None, marker=None, sort_key=None, sort_dir=None, owner=None, project=None): - query = model_query(models.Port) - query = query.filter_by(node_id=node_id) + query = sa.select(models.Port).where(models.Port.node_id == node_id) if owner: query = add_port_filter_by_node_owner(query, owner) elif project: @@ -909,8 +1033,10 @@ class Connection(api.Connection): def get_ports_by_portgroup_id(self, portgroup_id, limit=None, marker=None, sort_key=None, sort_dir=None, owner=None, project=None): - query = model_query(models.Port) - query = query.filter_by(portgroup_id=portgroup_id) + query = sa.select(models.Port).where( + models.Port.portgroup_id == portgroup_id + ) + if owner: query = add_port_filter_by_node_owner(query, owner) elif project: @@ -925,15 +1051,15 @@ class Connection(api.Connection): port = models.Port() port.update(values) - with _session_for_write() as session: - try: + try: + with _session_for_write() as session: session.add(port) session.flush() - except db_exc.DBDuplicateEntry as exc: - if 'address' in exc.columns: - raise exception.MACAlreadyExists(mac=values['address']) - raise exception.PortAlreadyExists(uuid=values['uuid']) - return port + except db_exc.DBDuplicateEntry as exc: + if 'address' in exc.columns: + raise exception.MACAlreadyExists(mac=values['address']) + raise exception.PortAlreadyExists(uuid=values['uuid']) + return port @oslo_db_api.retry_on_deadlock def update_port(self, port_id, values): @@ -941,7 +1067,6 @@ class Connection(api.Connection): if 'uuid' in values: msg = _("Cannot overwrite UUID for an existing Port.") raise exception.InvalidParameterValue(err=msg) - try: with _session_for_write() as session: query = model_query(models.Port) @@ -1010,7 +1135,7 @@ class Connection(api.Connection): def get_portgroups_by_node_id(self, node_id, limit=None, marker=None, sort_key=None, sort_dir=None, project=None): query = model_query(models.Portgroup) - query = query.filter_by(node_id=node_id) + query = query.where(models.Portgroup.node_id == node_id) if project: query = add_portgroup_filter_by_node_project(query, project) return _paginate_query(models.Portgroup, limit, marker, @@ -1067,34 +1192,40 @@ class Connection(api.Connection): def destroy_portgroup(self, portgroup_id): def portgroup_not_empty(session): """Checks whether the portgroup does not have ports.""" - - query = model_query(models.Port) - query = add_port_filter_by_portgroup(query, portgroup_id) - - return query.count() != 0 + with _session_for_read() as session: + return session.scalar( + sa.select( + sa.func.count(models.Port.id) + ).where(models.Port.portgroup_id == portgroup_id)) != 0 with _session_for_write() as session: if portgroup_not_empty(session): raise exception.PortgroupNotEmpty(portgroup=portgroup_id) - query = model_query(models.Portgroup, session=session) - query = add_identity_filter(query, portgroup_id) + query = sa.delete(models.Portgroup) + query = add_identity_where(query, models.Portgroup, portgroup_id) - count = query.delete() + count = session.execute(query).rowcount if count == 0: raise exception.PortgroupNotFound(portgroup=portgroup_id) def get_chassis_by_id(self, chassis_id): - query = model_query(models.Chassis).filter_by(id=chassis_id) + query = sa.select(models.Chassis).where( + models.Chassis.id == chassis_id) + try: - return query.one() + with _session_for_read() as session: + return session.execute(query).one()[0] except NoResultFound: raise exception.ChassisNotFound(chassis=chassis_id) def get_chassis_by_uuid(self, chassis_uuid): - query = model_query(models.Chassis).filter_by(uuid=chassis_uuid) + query = sa.select(models.Chassis).where( + models.Chassis.uuid == chassis_uuid) + try: - return query.one() + with _session_for_read() as session: + return session.execute(query).one()[0] except NoResultFound: raise exception.ChassisNotFound(chassis=chassis_uuid) @@ -1110,13 +1241,13 @@ class Connection(api.Connection): chassis = models.Chassis() chassis.update(values) - with _session_for_write() as session: - try: + try: + with _session_for_write() as session: session.add(chassis) session.flush() - except db_exc.DBDuplicateEntry: - raise exception.ChassisAlreadyExists(uuid=values['uuid']) - return chassis + except db_exc.DBDuplicateEntry: + raise exception.ChassisAlreadyExists(uuid=values['uuid']) + return chassis @oslo_db_api.retry_on_deadlock def update_chassis(self, chassis_id, values): @@ -1127,7 +1258,7 @@ class Connection(api.Connection): with _session_for_write(): query = model_query(models.Chassis) - query = add_identity_filter(query, chassis_id) + query = add_identity_where(query, models.Chassis, chassis_id) count = query.update(values) if count != 1: @@ -1183,27 +1314,32 @@ class Connection(api.Connection): def get_conductor(self, hostname, online=True): try: - query = model_query(models.Conductor).filter_by(hostname=hostname) + query = sa.select(models.Conductor).where( + models.Conductor.hostname == hostname) if online is not None: - query = query.filter_by(online=online) - return query.one() + query = query.where(models.Conductor.online == online) + with _session_for_read() as session: + res = session.execute(query).one()[0] + return res except NoResultFound: raise exception.ConductorNotFound(conductor=hostname) @oslo_db_api.retry_on_deadlock def unregister_conductor(self, hostname): - with _session_for_write(): - query = (model_query(models.Conductor) - .filter_by(hostname=hostname, online=True)) - count = query.update({'online': False}) + with _session_for_write() as session: + query = sa.update(models.Conductor).where( + models.Conductor.hostname == hostname, + models.Conductor.online == True).values( # noqa + online=False) + count = session.execute(query).rowcount if count == 0: raise exception.ConductorNotFound(conductor=hostname) @oslo_db_api.retry_on_deadlock def touch_conductor(self, hostname): with _session_for_write(): - query = (model_query(models.Conductor) - .filter_by(hostname=hostname)) + query = model_query(models.Conductor) + query = query.where(models.Conductor.hostname == hostname) # since we're not changing any other field, manually set updated_at # and since we're heartbeating, make sure that online=True count = query.update({'updated_at': timeutils.utcnow(), @@ -1278,7 +1414,7 @@ class Connection(api.Connection): def list_conductor_hardware_interfaces(self, conductor_id): query = (model_query(models.ConductorHardwareInterfaces) - .filter_by(conductor_id=conductor_id)) + .where(models.ConductorHardwareInterfaces.conductor_id == conductor_id)) # noqa return query.all() def list_hardware_type_interfaces(self, hardware_types): @@ -1293,6 +1429,13 @@ class Connection(api.Connection): def register_conductor_hardware_interfaces(self, conductor_id, interfaces): with _session_for_write() as session: try: + try: + session.begin() + except sa.exc.InvalidRequestError: + # When running unit tests, the transaction reports as + # already started, where as in service startup this is + # the first write op. + pass for iface in interfaces: conductor_hw_iface = models.ConductorHardwareInterfaces() conductor_hw_iface['conductor_id'] = conductor_id @@ -1322,7 +1465,8 @@ class Connection(api.Connection): raise exception.NodeNotFound(node=node_id) def _check_node_exists(self, node_id): - if not model_query(models.Node).filter_by(id=node_id).scalar(): + if not model_query(models.Node).where( + models.Node.id == node_id).scalar(): raise exception.NodeNotFound(node=node_id) @oslo_db_api.retry_on_deadlock @@ -1383,12 +1527,17 @@ class Connection(api.Connection): return model_query(q.exists()).scalar() def get_node_by_port_addresses(self, addresses): - q = _get_node_query_with_all_for_single_node() + q = _get_node_select() q = q.distinct().join(models.Port) q = q.filter(models.Port.address.in_(addresses)) try: - return q.one() + # FIXME(TheJulia): This needs to be updated to be + # an explicit query to identify the node for SQLAlchemy. + with _session_for_read() as session: + # Always return the first element, since we always + # get a tuple from sqlalchemy. + return session.execute(q).one()[0] except NoResultFound: raise exception.NodeNotFound( _('Node with port addresses %s was not found') @@ -1424,7 +1573,8 @@ class Connection(api.Connection): def get_volume_connectors_by_node_id(self, node_id, limit=None, marker=None, sort_key=None, sort_dir=None, project=None): - query = model_query(models.VolumeConnector).filter_by(node_id=node_id) + query = model_query(models.VolumeConnector).where( + models.VolumeConnector.node_id == node_id) if project: add_volume_conn_filter_by_node_project(query, project) return _paginate_query(models.VolumeConnector, limit, marker, @@ -1492,7 +1642,8 @@ class Connection(api.Connection): sort_key, sort_dir, query) def get_volume_target_by_id(self, db_id): - query = model_query(models.VolumeTarget).filter_by(id=db_id) + query = model_query(models.VolumeTarget).where( + models.VolumeTarget.id == db_id) try: return query.one() except NoResultFound: @@ -1517,7 +1668,8 @@ class Connection(api.Connection): def get_volume_targets_by_volume_id(self, volume_id, limit=None, marker=None, sort_key=None, sort_dir=None, project=None): - query = model_query(models.VolumeTarget).filter_by(volume_id=volume_id) + query = model_query(models.VolumeTarget).where( + models.VolumeTarget.volume_id == volume_id) if project: query = add_volume_target_filter_by_node_project(query, project) return _paginate_query(models.VolumeTarget, limit, marker, sort_key, @@ -1586,6 +1738,8 @@ class Connection(api.Connection): if not versions: return [] + if model_name == 'Node': + model_name = 'NodeBase' model = models.get_class(model_name) # NOTE(rloo): .notin_ does not handle null: @@ -1614,7 +1768,11 @@ class Connection(api.Connection): """ object_versions = release_mappings.get_object_versions() table_missing_ok = False - for model in models.Base.__subclasses__(): + models_to_check = models.Base.__subclasses__() + # We need to append Node to the list as it is a subclass of + # NodeBase, which is intentional to delineate excess queries. + models_to_check.append(models.Node) + for model in models_to_check: if model.__name__ not in object_versions: continue @@ -1688,8 +1846,9 @@ class Connection(api.Connection): mapping = release_mappings.RELEASE_MAPPING['master']['objects'] total_to_migrate = 0 total_migrated = 0 - - sql_models = [model for model in models.Base.__subclasses__() + all_models = models.Base.__subclasses__() + all_models.append(models.Node) + sql_models = [model for model in all_models if model.__name__ in mapping] for model in sql_models: version = mapping[model.__name__][0] @@ -2221,29 +2380,29 @@ class Connection(api.Connection): # this does not work with PostgreSQL. query = model_query(models.DeployTemplate) query = add_identity_filter(query, template_id) - try: - ref = query.with_for_update().one() - except NoResultFound: - raise exception.DeployTemplateNotFound( - template=template_id) - + ref = query.with_for_update().one() # First, update non-step columns. steps = values.pop('steps', None) ref.update(values) - # If necessary, update steps. if steps is not None: self._update_deploy_template_steps(session, ref.id, steps) + session.flush() + with _session_for_read() as session: # Return the updated template joined with all relevant fields. - query = _get_deploy_template_query_with_steps() + query = _get_deploy_template_select_with_steps() query = add_identity_filter(query, template_id) - return query.one() + return session.execute(query).one()[0] except db_exc.DBDuplicateEntry as e: if 'name' in e.columns: raise exception.DeployTemplateDuplicateName( name=values['name']) raise + except NoResultFound: + # TODO(TheJulia): What would unified core raise?!? + raise exception.DeployTemplateNotFound( + template=template_id) @oslo_db_api.retry_on_deadlock def destroy_deploy_template(self, template_id): @@ -2257,21 +2416,26 @@ class Connection(api.Connection): def _get_deploy_template(self, field, value): """Helper method for retrieving a deploy template.""" - query = (_get_deploy_template_query_with_steps() - .filter_by(**{field: value})) + query = (_get_deploy_template_select_with_steps() + .where(field == value)) try: - return query.one() + # FIXME(TheJulia): This needs to be fixed for SQLAlchemy 2.0 + with _session_for_read() as session: + return session.execute(query).one()[0] except NoResultFound: raise exception.DeployTemplateNotFound(template=value) def get_deploy_template_by_id(self, template_id): - return self._get_deploy_template('id', template_id) + return self._get_deploy_template(models.DeployTemplate.id, + template_id) def get_deploy_template_by_uuid(self, template_uuid): - return self._get_deploy_template('uuid', template_uuid) + return self._get_deploy_template(models.DeployTemplate.uuid, + template_uuid) def get_deploy_template_by_name(self, template_name): - return self._get_deploy_template('name', template_name) + return self._get_deploy_template(models.DeployTemplate.name, + template_name) def get_deploy_template_list(self, limit=None, marker=None, sort_key=None, sort_dir=None): @@ -2280,9 +2444,14 @@ class Connection(api.Connection): sort_key, sort_dir, query) def get_deploy_template_list_by_names(self, names): - query = (_get_deploy_template_query_with_steps() - .filter(models.DeployTemplate.name.in_(names))) - return query.all() + query = _get_deploy_template_select_with_steps() + with _session_for_read() as session: + res = session.execute( + query.where( + models.DeployTemplate.name.in_(names) + ) + ).all() + return [r[0] for r in res] @oslo_db_api.retry_on_deadlock def create_node_history(self, values): @@ -2329,7 +2498,7 @@ class Connection(api.Connection): def get_node_history_by_node_id(self, node_id, limit=None, marker=None, sort_key=None, sort_dir=None): query = model_query(models.NodeHistory) - query = query.filter_by(node_id=node_id) + query = query.where(models.NodeHistory.node_id == node_id) return _paginate_query(models.NodeHistory, limit, marker, sort_key, sort_dir, query) @@ -2396,6 +2565,9 @@ class Connection(api.Connection): # Uses input entry list, selects entries matching those ids # then deletes them and does not synchronize the session so # sqlalchemy doesn't do extra un-necessary work. + # NOTE(TheJulia): This is "legacy" syntax, but it is still + # valid and under the hood SQLAlchemy rewrites the form into + # a delete syntax. session.query( models.NodeHistory ).filter( @@ -2414,13 +2586,12 @@ class Connection(api.Connection): # literally have the DB do *all* of the world, so no # client side ops occur. The column is also indexed, # which means this will be an index based response. - # TODO(TheJulia): This might need to be revised for - # SQLAlchemy 2.0 as it should be a scaler select and count - # instead. - return session.query( - models.Node.provision_state - ).filter( - or_( - models.Node.provision_state == v for v in state + return session.scalar( + sa.select( + sa.func.count(models.Node.id) + ).filter( + or_( + models.Node.provision_state == v for v in state + ) ) - ).count() + ) |