From ab290bbb329028976994958d7fee22c28b3b5f97 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Wed, 1 Feb 2023 01:22:19 +0200 Subject: Fixed SQLAlchemy 2.0 compatibility Closes #704. --- docs/versionhistory.rst | 1 + src/apscheduler/datastores/sqlalchemy.py | 70 +++++++++++++++++--------------- 2 files changed, 38 insertions(+), 33 deletions(-) diff --git a/docs/versionhistory.rst b/docs/versionhistory.rst index e86d486..63967a9 100644 --- a/docs/versionhistory.rst +++ b/docs/versionhistory.rst @@ -33,6 +33,7 @@ APScheduler, see the :doc:`migration section `. - The synchronous scheduler now runs an asyncio event loop in a thread, acting as a façade for ``AsyncScheduler` - Fixed the ``schema`` parameter in ``SQLAlchemyDataStore`` not being applied +- Fixed SQLalchemy 2.0 compatibility **4.0.0a2** diff --git a/src/apscheduler/datastores/sqlalchemy.py b/src/apscheduler/datastores/sqlalchemy.py index 3fcae32..fff1c14 100644 --- a/src/apscheduler/datastores/sqlalchemy.py +++ b/src/apscheduler/datastores/sqlalchemy.py @@ -30,7 +30,12 @@ from sqlalchemy import ( select, ) from sqlalchemy.engine import URL, Dialect, Result -from sqlalchemy.exc import CompileError, IntegrityError, InterfaceError +from sqlalchemy.exc import ( + CompileError, + IntegrityError, + InterfaceError, + ProgrammingError, +) from sqlalchemy.ext.asyncio import AsyncConnection, AsyncEngine, create_async_engine from sqlalchemy.future import Connection, Engine from sqlalchemy.sql import ClauseElement, Executable @@ -120,6 +125,7 @@ class SQLAlchemyDataStore(BaseExternalDataStore): max_idle_time: float = attrs.field(default=60) _is_async: bool = attrs.field(init=False) + _supports_update_returning: bool = attrs.field(init=False, default=False) def __attrs_post_init__(self) -> None: # Generate the table definitions @@ -132,15 +138,6 @@ class SQLAlchemyDataStore(BaseExternalDataStore): self.t_job_results = self._metadata.tables[prefix + "job_results"] self._is_async = isinstance(self.engine, AsyncEngine) - # Find out if the dialect supports UPDATE...RETURNING - update = self.t_jobs.update().returning(self.t_jobs.c.id) - try: - update.compile(bind=self.engine) - except CompileError: - self._supports_update_returning = False - else: - self._supports_update_returning = True - @classmethod def from_url(cls: type[Self], url: str | URL, **options) -> Self: """ @@ -312,7 +309,7 @@ class SQLAlchemyDataStore(BaseExternalDataStore): version = result.scalar() if version is None: await self._execute( - conn, self.t_metadata.insert(values={"schema_version": 1}) + conn, self.t_metadata.insert(), {"schema_version": 1} ) elif version > 1: raise RuntimeError( @@ -321,6 +318,20 @@ class SQLAlchemyDataStore(BaseExternalDataStore): f"APScheduler" ) + # Find out if the dialect supports UPDATE...RETURNING + async for attempt in self._retry(): + with attempt: + update = self.t_metadata.update().returning( + self.t_metadata.c.schema_version + ) + async with self._begin_transaction() as conn: + try: + await self._execute(conn, update) + except (CompileError, ProgrammingError): + pass # the support flag is False by default + else: + self._supports_update_returning = True + async def _deserialize_schedules(self, result: Result) -> list[Schedule]: schedules: list[Schedule] = [] for row in result: @@ -391,14 +402,12 @@ class SQLAlchemyDataStore(BaseExternalDataStore): async def get_task(self, task_id: str) -> Task: query = select( - [ - self.t_tasks.c.id, - self.t_tasks.c.func, - self.t_tasks.c.executor, - self.t_tasks.c.max_running_jobs, - self.t_tasks.c.state, - self.t_tasks.c.misfire_grace_time, - ] + self.t_tasks.c.id, + self.t_tasks.c.func, + self.t_tasks.c.executor, + self.t_tasks.c.max_running_jobs, + self.t_tasks.c.state, + self.t_tasks.c.misfire_grace_time, ).where(self.t_tasks.c.id == task_id) async for attempt in self._retry(): with attempt: @@ -413,14 +422,12 @@ class SQLAlchemyDataStore(BaseExternalDataStore): async def get_tasks(self) -> list[Task]: query = select( - [ - self.t_tasks.c.id, - self.t_tasks.c.func, - self.t_tasks.c.executor, - self.t_tasks.c.max_running_jobs, - self.t_tasks.c.state, - self.t_tasks.c.misfire_grace_time, - ] + self.t_tasks.c.id, + self.t_tasks.c.func, + self.t_tasks.c.executor, + self.t_tasks.c.max_running_jobs, + self.t_tasks.c.state, + self.t_tasks.c.misfire_grace_time, ).order_by(self.t_tasks.c.id) async for attempt in self._retry(): with attempt: @@ -521,7 +528,7 @@ class SQLAlchemyDataStore(BaseExternalDataStore): .with_for_update(skip_locked=True) .cte() ) - subselect = select([schedules_cte.c.id]) + subselect = select(schedules_cte.c.id) update = ( self.t_schedules.update() .where(self.t_schedules.c.id.in_(subselect)) @@ -694,11 +701,8 @@ class SQLAlchemyDataStore(BaseExternalDataStore): # Retrieve the limits query = select( - [ - self.t_tasks.c.id, - self.t_tasks.c.max_running_jobs - - self.t_tasks.c.running_jobs, - ] + self.t_tasks.c.id, + self.t_tasks.c.max_running_jobs - self.t_tasks.c.running_jobs, ).where( self.t_tasks.c.max_running_jobs.isnot(None), self.t_tasks.c.id.in_(task_ids), -- cgit v1.2.1