diff options
author | Zuul <zuul@review.opendev.org> | 2023-04-13 19:28:27 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2023-04-13 19:28:27 +0000 |
commit | 314118bd616a0d9d4fb008fce69f2dbbc2e875c0 (patch) | |
tree | ada1debd83dbf642e851428a08d74431a63cf39b | |
parent | d3dc83e5238acf0856dbd29f67872efba5d784ce (diff) | |
parent | 77bc3cb7f83b0cc47f04ccda5125613e06690958 (diff) | |
download | taskflow-314118bd616a0d9d4fb008fce69f2dbbc2e875c0.tar.gz |
Merge "Prepare taskflow for sqlalchemy2"
-rw-r--r-- | taskflow/persistence/backends/impl_sqlalchemy.py | 34 |
1 files changed, 21 insertions, 13 deletions
diff --git a/taskflow/persistence/backends/impl_sqlalchemy.py b/taskflow/persistence/backends/impl_sqlalchemy.py index 3ac0f3d..742403b 100644 --- a/taskflow/persistence/backends/impl_sqlalchemy.py +++ b/taskflow/persistence/backends/impl_sqlalchemy.py @@ -205,15 +205,17 @@ class _Alchemist(object): return atom_cls.from_dict(row) def atom_query_iter(self, conn, parent_uuid): - q = (sql.select([self._tables.atomdetails]). + q = (sql.select(self._tables.atomdetails). where(self._tables.atomdetails.c.parent_uuid == parent_uuid)) for row in conn.execute(q): + row = row._mapping yield self.convert_atom_detail(row) def flow_query_iter(self, conn, parent_uuid): - q = (sql.select([self._tables.flowdetails]). + q = (sql.select(self._tables.flowdetails). where(self._tables.flowdetails.c.parent_uuid == parent_uuid)) for row in conn.execute(q): + row = row._mapping yield self.convert_flow_detail(row) def populate_book(self, conn, book): @@ -257,7 +259,6 @@ class SQLAlchemyBackend(base.Backend): conf = copy.deepcopy(conf) engine_args = { 'echo': _as_bool(conf.pop('echo', False)), - 'convert_unicode': _as_bool(conf.pop('convert_unicode', True)), 'pool_recycle': 3600, } if 'idle_timeout' in conf: @@ -421,12 +422,13 @@ class Connection(base.Connection): try: atomdetails = self._tables.atomdetails with self._engine.begin() as conn: - q = (sql.select([atomdetails]). + q = (sql.select(atomdetails). where(atomdetails.c.uuid == atom_detail.uuid)) row = conn.execute(q).first() if not row: raise exc.NotFound("No atom details found with uuid" " '%s'" % atom_detail.uuid) + row = row._mapping e_ad = self._converter.convert_atom_detail(row) self._update_atom_details(conn, atom_detail, e_ad) return e_ad @@ -438,7 +440,7 @@ class Connection(base.Connection): def _insert_flow_details(self, conn, fd, parent_uuid): value = fd.to_dict() value['parent_uuid'] = parent_uuid - conn.execute(sql.insert(self._tables.flowdetails, value)) + conn.execute(sql.insert(self._tables.flowdetails).values(**value)) for ad in fd: self._insert_atom_details(conn, ad, fd.uuid) @@ -446,7 +448,7 @@ class Connection(base.Connection): value = ad.to_dict() value['parent_uuid'] = parent_uuid value['atom_type'] = models.atom_detail_type(ad) - conn.execute(sql.insert(self._tables.atomdetails, value)) + conn.execute(sql.insert(self._tables.atomdetails).values(**value)) def _update_atom_details(self, conn, ad, e_ad): e_ad.merge(ad) @@ -471,12 +473,13 @@ class Connection(base.Connection): try: flowdetails = self._tables.flowdetails with self._engine.begin() as conn: - q = (sql.select([flowdetails]). + q = (sql.select(flowdetails). where(flowdetails.c.uuid == flow_detail.uuid)) row = conn.execute(q).first() if not row: raise exc.NotFound("No flow details found with" " uuid '%s'" % flow_detail.uuid) + row = row._mapping e_fd = self._converter.convert_flow_detail(row) self._converter.populate_flow_detail(conn, e_fd) self._update_flow_details(conn, flow_detail, e_fd) @@ -503,10 +506,11 @@ class Connection(base.Connection): try: logbooks = self._tables.logbooks with self._engine.begin() as conn: - q = (sql.select([logbooks]). + q = (sql.select(logbooks). where(logbooks.c.uuid == book.uuid)) row = conn.execute(q).first() if row: + row = row._mapping e_lb = self._converter.convert_book(row) self._converter.populate_book(conn, e_lb) e_lb.merge(book) @@ -522,7 +526,7 @@ class Connection(base.Connection): self._update_flow_details(conn, fd, e_fd) return e_lb else: - conn.execute(sql.insert(logbooks, book.to_dict())) + conn.execute(sql.insert(logbooks).values(**book.to_dict())) for fd in book: self._insert_flow_details(conn, fd, book.uuid) return book @@ -535,12 +539,13 @@ class Connection(base.Connection): try: logbooks = self._tables.logbooks with contextlib.closing(self._engine.connect()) as conn: - q = (sql.select([logbooks]). + q = (sql.select(logbooks). where(logbooks.c.uuid == book_uuid)) row = conn.execute(q).first() if not row: raise exc.NotFound("No logbook found with" " uuid '%s'" % book_uuid) + row = row._mapping book = self._converter.convert_book(row) if not lazy: self._converter.populate_book(conn, book) @@ -553,8 +558,9 @@ class Connection(base.Connection): gathered = [] try: with contextlib.closing(self._engine.connect()) as conn: - q = sql.select([self._tables.logbooks]) + q = sql.select(self._tables.logbooks) for row in conn.execute(q): + row = row._mapping book = self._converter.convert_book(row) if not lazy: self._converter.populate_book(conn, book) @@ -584,12 +590,13 @@ class Connection(base.Connection): try: flowdetails = self._tables.flowdetails with self._engine.begin() as conn: - q = (sql.select([flowdetails]). + q = (sql.select(flowdetails). where(flowdetails.c.uuid == fd_uuid)) row = conn.execute(q).first() if not row: raise exc.NotFound("No flow details found with uuid" " '%s'" % fd_uuid) + row = row._mapping fd = self._converter.convert_flow_detail(row) if not lazy: self._converter.populate_flow_detail(conn, fd) @@ -603,12 +610,13 @@ class Connection(base.Connection): try: atomdetails = self._tables.atomdetails with self._engine.begin() as conn: - q = (sql.select([atomdetails]). + q = (sql.select(atomdetails). where(atomdetails.c.uuid == ad_uuid)) row = conn.execute(q).first() if not row: raise exc.NotFound("No atom details found with uuid" " '%s'" % ad_uuid) + row = row._mapping return self._converter.convert_atom_detail(row) except sa_exc.SQLAlchemyError: exc.raise_with_cause(exc.StorageFailure, |