summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2023-02-01 01:22:19 +0200
committerAlex Grönholm <alex.gronholm@nextday.fi>2023-02-01 01:22:19 +0200
commitab290bbb329028976994958d7fee22c28b3b5f97 (patch)
tree5ff4f35f0a749073ce25b115afe9b96ad701348e
parent48007ff4ec0b632d990b1b751c7fe9a61bb50433 (diff)
downloadapscheduler-ab290bbb329028976994958d7fee22c28b3b5f97.tar.gz
Fixed SQLAlchemy 2.0 compatibility
Closes #704.
-rw-r--r--docs/versionhistory.rst1
-rw-r--r--src/apscheduler/datastores/sqlalchemy.py70
2 files changed, 38 insertions, 33 deletions
diff --git a/docs/versionhistory.rst b/docs/versionhistory.rst
index e86d486..63967a9 100644
--- a/docs/versionhistory.rst
+++ b/docs/versionhistory.rst
@@ -33,6 +33,7 @@ APScheduler, see the :doc:`migration section <migration>`.
- The synchronous scheduler now runs an asyncio event loop in a thread, acting as a
façade for ``AsyncScheduler`
- Fixed the ``schema`` parameter in ``SQLAlchemyDataStore`` not being applied
+- Fixed SQLalchemy 2.0 compatibility
**4.0.0a2**
diff --git a/src/apscheduler/datastores/sqlalchemy.py b/src/apscheduler/datastores/sqlalchemy.py
index 3fcae32..fff1c14 100644
--- a/src/apscheduler/datastores/sqlalchemy.py
+++ b/src/apscheduler/datastores/sqlalchemy.py
@@ -30,7 +30,12 @@ from sqlalchemy import (
select,
)
from sqlalchemy.engine import URL, Dialect, Result
-from sqlalchemy.exc import CompileError, IntegrityError, InterfaceError
+from sqlalchemy.exc import (
+ CompileError,
+ IntegrityError,
+ InterfaceError,
+ ProgrammingError,
+)
from sqlalchemy.ext.asyncio import AsyncConnection, AsyncEngine, create_async_engine
from sqlalchemy.future import Connection, Engine
from sqlalchemy.sql import ClauseElement, Executable
@@ -120,6 +125,7 @@ class SQLAlchemyDataStore(BaseExternalDataStore):
max_idle_time: float = attrs.field(default=60)
_is_async: bool = attrs.field(init=False)
+ _supports_update_returning: bool = attrs.field(init=False, default=False)
def __attrs_post_init__(self) -> None:
# Generate the table definitions
@@ -132,15 +138,6 @@ class SQLAlchemyDataStore(BaseExternalDataStore):
self.t_job_results = self._metadata.tables[prefix + "job_results"]
self._is_async = isinstance(self.engine, AsyncEngine)
- # Find out if the dialect supports UPDATE...RETURNING
- update = self.t_jobs.update().returning(self.t_jobs.c.id)
- try:
- update.compile(bind=self.engine)
- except CompileError:
- self._supports_update_returning = False
- else:
- self._supports_update_returning = True
-
@classmethod
def from_url(cls: type[Self], url: str | URL, **options) -> Self:
"""
@@ -312,7 +309,7 @@ class SQLAlchemyDataStore(BaseExternalDataStore):
version = result.scalar()
if version is None:
await self._execute(
- conn, self.t_metadata.insert(values={"schema_version": 1})
+ conn, self.t_metadata.insert(), {"schema_version": 1}
)
elif version > 1:
raise RuntimeError(
@@ -321,6 +318,20 @@ class SQLAlchemyDataStore(BaseExternalDataStore):
f"APScheduler"
)
+ # Find out if the dialect supports UPDATE...RETURNING
+ async for attempt in self._retry():
+ with attempt:
+ update = self.t_metadata.update().returning(
+ self.t_metadata.c.schema_version
+ )
+ async with self._begin_transaction() as conn:
+ try:
+ await self._execute(conn, update)
+ except (CompileError, ProgrammingError):
+ pass # the support flag is False by default
+ else:
+ self._supports_update_returning = True
+
async def _deserialize_schedules(self, result: Result) -> list[Schedule]:
schedules: list[Schedule] = []
for row in result:
@@ -391,14 +402,12 @@ class SQLAlchemyDataStore(BaseExternalDataStore):
async def get_task(self, task_id: str) -> Task:
query = select(
- [
- self.t_tasks.c.id,
- self.t_tasks.c.func,
- self.t_tasks.c.executor,
- self.t_tasks.c.max_running_jobs,
- self.t_tasks.c.state,
- self.t_tasks.c.misfire_grace_time,
- ]
+ self.t_tasks.c.id,
+ self.t_tasks.c.func,
+ self.t_tasks.c.executor,
+ self.t_tasks.c.max_running_jobs,
+ self.t_tasks.c.state,
+ self.t_tasks.c.misfire_grace_time,
).where(self.t_tasks.c.id == task_id)
async for attempt in self._retry():
with attempt:
@@ -413,14 +422,12 @@ class SQLAlchemyDataStore(BaseExternalDataStore):
async def get_tasks(self) -> list[Task]:
query = select(
- [
- self.t_tasks.c.id,
- self.t_tasks.c.func,
- self.t_tasks.c.executor,
- self.t_tasks.c.max_running_jobs,
- self.t_tasks.c.state,
- self.t_tasks.c.misfire_grace_time,
- ]
+ self.t_tasks.c.id,
+ self.t_tasks.c.func,
+ self.t_tasks.c.executor,
+ self.t_tasks.c.max_running_jobs,
+ self.t_tasks.c.state,
+ self.t_tasks.c.misfire_grace_time,
).order_by(self.t_tasks.c.id)
async for attempt in self._retry():
with attempt:
@@ -521,7 +528,7 @@ class SQLAlchemyDataStore(BaseExternalDataStore):
.with_for_update(skip_locked=True)
.cte()
)
- subselect = select([schedules_cte.c.id])
+ subselect = select(schedules_cte.c.id)
update = (
self.t_schedules.update()
.where(self.t_schedules.c.id.in_(subselect))
@@ -694,11 +701,8 @@ class SQLAlchemyDataStore(BaseExternalDataStore):
# Retrieve the limits
query = select(
- [
- self.t_tasks.c.id,
- self.t_tasks.c.max_running_jobs
- - self.t_tasks.c.running_jobs,
- ]
+ self.t_tasks.c.id,
+ self.t_tasks.c.max_running_jobs - self.t_tasks.c.running_jobs,
).where(
self.t_tasks.c.max_running_jobs.isnot(None),
self.t_tasks.c.id.in_(task_ids),