summaryrefslogtreecommitdiff
path: root/src/apscheduler/datastores/async_sqlalchemy.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/apscheduler/datastores/async_sqlalchemy.py')
-rw-r--r--src/apscheduler/datastores/async_sqlalchemy.py115
1 files changed, 9 insertions, 106 deletions
diff --git a/src/apscheduler/datastores/async_sqlalchemy.py b/src/apscheduler/datastores/async_sqlalchemy.py
index fad2cd3..c0d21cb 100644
--- a/src/apscheduler/datastores/async_sqlalchemy.py
+++ b/src/apscheduler/datastores/async_sqlalchemy.py
@@ -5,7 +5,6 @@ from collections import defaultdict
from contextlib import AsyncExitStack, closing
from datetime import datetime, timedelta, timezone
from json import JSONDecodeError
-from logging import Logger, getLogger
from typing import Any, Callable, Iterable, Optional
from uuid import UUID
@@ -13,29 +12,26 @@ import attr
import sniffio
from anyio import TASK_STATUS_IGNORED, create_task_group, sleep
from attr import asdict
-from sqlalchemy import (
- JSON, TIMESTAMP, Column, Enum, Integer, LargeBinary, MetaData, Table, Unicode, and_, bindparam,
- func, or_, select)
+from sqlalchemy import and_, bindparam, func, or_, select
from sqlalchemy.engine import URL, Result
-from sqlalchemy.exc import CompileError, IntegrityError
+from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncConnection, create_async_engine
from sqlalchemy.ext.asyncio.engine import AsyncEngine
from sqlalchemy.sql.ddl import DropTable
-from sqlalchemy.sql.elements import BindParameter, literal
+from sqlalchemy.sql.elements import BindParameter
from .. import events as events_module
-from ..abc import AsyncDataStore, Job, Schedule, Serializer
-from ..enums import CoalescePolicy, ConflictPolicy, JobOutcome
+from ..abc import AsyncDataStore, Job, Schedule
+from ..enums import ConflictPolicy
from ..events import (
AsyncEventHub, DataStoreEvent, Event, JobAdded, JobDeserializationFailed, ScheduleAdded,
ScheduleDeserializationFailed, ScheduleRemoved, ScheduleUpdated, SubscriptionToken, TaskAdded,
TaskRemoved, TaskUpdated)
from ..exceptions import ConflictingIdError, SerializationError, TaskLookupError
from ..marshalling import callable_to_ref
-from ..serializers.pickle import PickleSerializer
from ..structures import JobResult, Task
from ..util import reentrant
-from .sqlalchemy import EmulatedTimestampTZ, EmulatedUUID
+from .sqlalchemy import _BaseSQLAlchemyDataStore
def default_json_handler(obj: Any) -> Any:
@@ -63,37 +59,14 @@ def json_object_hook(obj: dict[str, Any]) -> Any:
@reentrant
@attr.define(eq=False)
-class AsyncSQLAlchemyDataStore(AsyncDataStore):
+class AsyncSQLAlchemyDataStore(_BaseSQLAlchemyDataStore, AsyncDataStore):
engine: AsyncEngine
- schema: Optional[str] = attr.field(default=None, kw_only=True)
- serializer: Serializer = attr.field(factory=PickleSerializer, kw_only=True)
- lock_expiration_delay: float = attr.field(default=30, kw_only=True)
- max_poll_time: Optional[float] = attr.field(default=1, kw_only=True)
- max_idle_time: float = attr.field(default=60, kw_only=True)
- notify_channel: Optional[str] = attr.field(default='apscheduler', kw_only=True)
- start_from_scratch: bool = attr.field(default=False, kw_only=True)
-
- _logger: Logger = attr.field(init=False, factory=lambda: getLogger(__name__))
+
_exit_stack: AsyncExitStack = attr.field(init=False, factory=AsyncExitStack)
_events: AsyncEventHub = attr.field(init=False, factory=AsyncEventHub)
def __attrs_post_init__(self) -> None:
- # Generate the table definitions
- self._metadata = self.get_table_definitions()
- self.t_metadata = self._metadata.tables['metadata']
- self.t_tasks = self._metadata.tables['tasks']
- self.t_schedules = self._metadata.tables['schedules']
- self.t_jobs = self._metadata.tables['jobs']
- self.t_job_results = self._metadata.tables['job_results']
-
- # Find out if the dialect supports UPDATE...RETURNING
- update = self.t_jobs.update().returning(self.t_jobs.c.id)
- try:
- update.compile(bind=self.engine)
- except CompileError:
- self._supports_update_returning = False
- else:
- self._supports_update_returning = True
+ super().__attrs_post_init__()
if self.notify_channel:
if self.engine.dialect.name != 'postgresql' or self.engine.dialect.driver != 'asyncpg':
@@ -138,76 +111,6 @@ class AsyncSQLAlchemyDataStore(AsyncDataStore):
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self._exit_stack.__aexit__(exc_type, exc_val, exc_tb)
- def get_table_definitions(self) -> MetaData:
- if self.engine.dialect.name == 'postgresql':
- from sqlalchemy.dialects import postgresql
-
- timestamp_type = TIMESTAMP(timezone=True)
- job_id_type = postgresql.UUID(as_uuid=True)
- else:
- timestamp_type = EmulatedTimestampTZ
- job_id_type = EmulatedUUID
-
- metadata = MetaData()
- Table(
- 'metadata',
- metadata,
- Column('schema_version', Integer, nullable=False)
- )
- Table(
- 'tasks',
- metadata,
- Column('id', Unicode(500), primary_key=True),
- Column('func', Unicode(500), nullable=False),
- Column('state', LargeBinary),
- Column('max_running_jobs', Integer),
- Column('misfire_grace_time', Unicode(16)),
- Column('running_jobs', Integer, nullable=False, server_default=literal(0))
- )
- Table(
- 'schedules',
- metadata,
- Column('id', Unicode(500), primary_key=True),
- Column('task_id', Unicode(500), nullable=False, index=True),
- Column('trigger', LargeBinary),
- Column('args', LargeBinary),
- Column('kwargs', LargeBinary),
- Column('coalesce', Enum(CoalescePolicy), nullable=False),
- Column('misfire_grace_time', Unicode(16)),
- # Column('max_jitter', Unicode(16)),
- Column('tags', JSON, nullable=False),
- Column('next_fire_time', timestamp_type, index=True),
- Column('last_fire_time', timestamp_type),
- Column('acquired_by', Unicode(500)),
- Column('acquired_until', timestamp_type)
- )
- Table(
- 'jobs',
- metadata,
- Column('id', job_id_type, primary_key=True),
- Column('task_id', Unicode(500), nullable=False, index=True),
- Column('args', LargeBinary, nullable=False),
- Column('kwargs', LargeBinary, nullable=False),
- Column('schedule_id', Unicode(500)),
- Column('scheduled_fire_time', timestamp_type),
- Column('start_deadline', timestamp_type),
- Column('tags', JSON, nullable=False),
- Column('created_at', timestamp_type, nullable=False),
- Column('started_at', timestamp_type),
- Column('acquired_by', Unicode(500)),
- Column('acquired_until', timestamp_type)
- )
- Table(
- 'job_results',
- metadata,
- Column('job_id', job_id_type, primary_key=True),
- Column('outcome', Enum(JobOutcome), nullable=False),
- Column('finished_at', timestamp_type, index=True),
- Column('exception', LargeBinary),
- Column('return_value', LargeBinary)
- )
- return metadata
-
async def _publish(self, conn: AsyncConnection, event: DataStoreEvent) -> None:
if self.notify_channel:
event_type = event.__class__.__name__