diff options
author | Alex Grönholm <alex.gronholm@nextday.fi> | 2021-09-03 00:15:23 +0300 |
---|---|---|
committer | Alex Grönholm <alex.gronholm@nextday.fi> | 2021-09-06 01:39:07 +0300 |
commit | 3cff37749365e6312871a3313b38a5b418666f8c (patch) | |
tree | b3ff9f6fa431af7adcb2fb7a3ef2b00871a4f738 | |
parent | fa522949af87c2615592b0342b00c0ee6af1bdde (diff) | |
download | apscheduler-3cff37749365e6312871a3313b38a5b418666f8c.tar.gz |
Mypy fixes
-rw-r--r-- | src/apscheduler/datastores/async_/sqlalchemy.py | 117 | ||||
-rw-r--r-- | src/apscheduler/datastores/sync/sqlalchemy.py | 119 | ||||
-rw-r--r-- | src/apscheduler/schedulers/async_.py | 1 | ||||
-rw-r--r-- | src/apscheduler/schedulers/sync.py | 2 |
4 files changed, 123 insertions, 116 deletions
diff --git a/src/apscheduler/datastores/async_/sqlalchemy.py b/src/apscheduler/datastores/async_/sqlalchemy.py index 3938f7e..caf832c 100644 --- a/src/apscheduler/datastores/async_/sqlalchemy.py +++ b/src/apscheduler/datastores/async_/sqlalchemy.py @@ -16,8 +16,9 @@ from sqlalchemy import ( from sqlalchemy.engine import URL from sqlalchemy.exc import CompileError, IntegrityError from sqlalchemy.ext.asyncio import AsyncConnection, create_async_engine -from sqlalchemy.ext.asyncio.engine import AsyncConnectable +from sqlalchemy.ext.asyncio.engine import AsyncEngine from sqlalchemy.sql.ddl import DropTable +from sqlalchemy.sql.elements import BindParameter from ... import events as events_module from ...abc import AsyncDataStore, Job, Schedule, Serializer @@ -58,12 +59,12 @@ def json_object_hook(obj: Dict[str, Any]) -> Any: @reentrant class SQLAlchemyDataStore(AsyncDataStore): - def __init__(self, bind: AsyncConnectable, *, schema: Optional[str] = None, + def __init__(self, engine: AsyncEngine, *, schema: Optional[str] = None, serializer: Optional[Serializer] = None, lock_expiration_delay: float = 30, max_poll_time: Optional[float] = 1, max_idle_time: float = 60, start_from_scratch: bool = False, notify_channel: Optional[str] = 'apscheduler'): - self.bind = bind + self.engine = engine self.schema = schema self.serializer = serializer or PickleSerializer() self.lock_expiration_delay = lock_expiration_delay @@ -82,9 +83,9 @@ class SQLAlchemyDataStore(AsyncDataStore): self.t_job_results = self._metadata.tables['job_results'] # Find out if the dialect supports RETURNING - statement = self.t_jobs.update().returning(self.t_jobs.c.id) + update = self.t_jobs.update().returning(self.t_jobs.c.id) try: - statement.compile(bind=self.bind) + update.compile(bind=self.engine) except CompileError: self._supports_update_returning = False else: @@ -92,7 +93,7 @@ class SQLAlchemyDataStore(AsyncDataStore): self.notify_channel = notify_channel if notify_channel: - if self.bind.dialect.name != 'postgresql' or self.bind.dialect.driver != 'asyncpg': + if self.engine.dialect.name != 'postgresql' or self.engine.dialect.driver != 'asyncpg': self.notify_channel = None @classmethod @@ -106,7 +107,7 @@ class SQLAlchemyDataStore(AsyncDataStore): raise RuntimeError(f'This data store requires asyncio; currently running: {asynclib}') # Verify that the schema is in place - async with self.bind.begin() as conn: + async with self.engine.begin() as conn: if self.start_from_scratch: for table in self._metadata.sorted_tables: await conn.execute(DropTable(table, if_exists=True)) @@ -135,7 +136,7 @@ class SQLAlchemyDataStore(AsyncDataStore): await self._exit_stack.__aexit__(exc_type, exc_val, exc_tb) def get_table_definitions(self) -> MetaData: - if self.bind.dialect.name in ('mysql', 'mariadb'): + if self.engine.dialect.name in ('mysql', 'mariadb'): from sqlalchemy.dialects.mysql import TIMESTAMP timestamp_type = TIMESTAMP(fsp=6) else: @@ -215,7 +216,7 @@ class SQLAlchemyDataStore(AsyncDataStore): task_started_sent = False while True: - with closing(await self.bind.raw_connection()) as conn: + with closing(await self.engine.raw_connection()) as conn: asyncpg_conn = conn.connection._connection await asyncpg_conn.add_listener(self.notify_channel, callback) if not task_started_sent: @@ -259,13 +260,14 @@ class SQLAlchemyDataStore(AsyncDataStore): self._events.unsubscribe(token) async def add_schedule(self, schedule: Schedule, conflict_policy: ConflictPolicy) -> None: + event: Event serialized_data = self.serializer.serialize(schedule) - statement = self.t_schedules.insert().\ + insert = self.t_schedules.insert().\ values(id=schedule.id, task_id=schedule.task_id, serialized_data=serialized_data, next_fire_time=schedule.next_fire_time) try: - async with self.bind.begin() as conn: - await conn.execute(statement) + async with self.engine.begin() as conn: + await conn.execute(insert) event = ScheduleAdded(schedule_id=schedule.id, next_fire_time=schedule.next_fire_time) await self._publish(conn, event) @@ -273,26 +275,26 @@ class SQLAlchemyDataStore(AsyncDataStore): if conflict_policy is ConflictPolicy.exception: raise ConflictingIdError(schedule.id) from None elif conflict_policy is ConflictPolicy.replace: - statement = self.t_schedules.update().\ + update = self.t_schedules.update().\ where(self.t_schedules.c.id == schedule.id).\ values(serialized_data=serialized_data, next_fire_time=schedule.next_fire_time) - async with self.bind.begin() as conn: - await conn.execute(statement) + async with self.engine.begin() as conn: + await conn.execute(update) event = ScheduleUpdated(schedule_id=schedule.id, next_fire_time=schedule.next_fire_time) await self._publish(conn, event) async def remove_schedules(self, ids: Iterable[str]) -> None: - async with self.bind.begin() as conn: - statement = self.t_schedules.delete().where(self.t_schedules.c.id.in_(ids)) + async with self.engine.begin() as conn: + delete = self.t_schedules.delete().where(self.t_schedules.c.id.in_(ids)) if self._supports_update_returning: - statement = statement.returning(self.t_schedules.c.id) - removed_ids = [row[0] for row in await conn.execute(statement)] + delete = delete.returning(self.t_schedules.c.id) + removed_ids: Iterable[str] = [row[0] for row in await conn.execute(delete)] else: # TODO: actually check which rows were deleted? - await conn.execute(statement) + await conn.execute(delete) removed_ids = ids for schedule_id in removed_ids: @@ -304,12 +306,12 @@ class SQLAlchemyDataStore(AsyncDataStore): if ids: query = query.where(self.t_schedules.c.id.in_(ids)) - async with self.bind.begin() as conn: + async with self.engine.begin() as conn: result = await conn.execute(query) return self._deserialize_schedules(result) async def acquire_schedules(self, scheduler_id: str, limit: int) -> List[Schedule]: - async with self.bind.begin() as conn: + async with self.engine.begin() as conn: now = datetime.now(timezone.utc) acquired_until = now + timedelta(seconds=self.lock_expiration_delay) schedules_cte = select(self.t_schedules.c.id).\ @@ -320,24 +322,25 @@ class SQLAlchemyDataStore(AsyncDataStore): order_by(self.t_schedules.c.next_fire_time).\ limit(limit).cte() subselect = select([schedules_cte.c.id]) - statement = self.t_schedules.update().\ + update = self.t_schedules.update().\ where(self.t_schedules.c.id.in_(subselect)).\ values(acquired_by=scheduler_id, acquired_until=acquired_until) if self._supports_update_returning: - statement = statement.returning(self.t_schedules.c.id, - self.t_schedules.c.serialized_data) + update = update.returning(self.t_schedules.c.id, + self.t_schedules.c.serialized_data) + result = await conn.execute(update) else: - await conn.execute(statement) - statement = select([self.t_schedules.c.id, self.t_schedules.c.serialized_data]).\ + await conn.execute(update) + query = select([self.t_schedules.c.id, self.t_schedules.c.serialized_data]).\ where(and_(self.t_schedules.c.acquired_by == scheduler_id)) + result = await conn.execute(query) - result = await conn.execute(statement) schedules = self._deserialize_schedules(result) return schedules async def release_schedules(self, scheduler_id: str, schedules: List[Schedule]) -> None: - async with self.bind.begin() as conn: + async with self.engine.begin() as conn: update_events: List[ScheduleUpdated] = [] finished_schedule_ids: List[str] = [] update_args: List[Dict[str, Any]] = [] @@ -361,21 +364,21 @@ class SQLAlchemyDataStore(AsyncDataStore): # Update schedules that have a next fire time if update_args: - p_id = bindparam('p_id') - p_serialized = bindparam('p_serialized_data') - p_next_fire_time = bindparam('p_next_fire_time') - statement = self.t_schedules.update().\ + p_id: BindParameter = bindparam('p_id') + p_serialized: BindParameter = bindparam('p_serialized_data') + p_next_fire_time: BindParameter = bindparam('p_next_fire_time') + update = self.t_schedules.update().\ where(and_(self.t_schedules.c.id == p_id, self.t_schedules.c.acquired_by == scheduler_id)).\ values(serialized_data=p_serialized, next_fire_time=p_next_fire_time, acquired_by=None, acquired_until=None) next_fire_times = {arg['p_id']: arg['p_next_fire_time'] for arg in update_args} if self._supports_update_returning: - statement = statement.returning(self.t_schedules.c.id) - updated_ids = [row[0] for row in await conn.execute(statement, update_args)] + update = update.returning(self.t_schedules.c.id) + updated_ids = [row[0] for row in await conn.execute(update, update_args)] else: # TODO: actually check which rows were updated? - await conn.execute(statement, update_args) + await conn.execute(update, update_args) updated_ids = list(next_fire_times) for schedule_id in updated_ids: @@ -385,9 +388,9 @@ class SQLAlchemyDataStore(AsyncDataStore): # Remove schedules that have no next fire time or failed to serialize if finished_schedule_ids: - statement = self.t_schedules.delete().\ + delete = self.t_schedules.delete().\ where(self.t_schedules.c.id.in_(finished_schedule_ids)) - await conn.execute(statement) + await conn.execute(delete) for event in update_events: await self._publish(conn, event) @@ -400,17 +403,17 @@ class SQLAlchemyDataStore(AsyncDataStore): where(self.t_schedules.c.next_fire_time.isnot(None)).\ order_by(self.t_schedules.c.next_fire_time).\ limit(1) - async with self.bind.begin() as conn: + async with self.engine.begin() as conn: result = await conn.execute(statenent) return result.scalar() async def add_job(self, job: Job) -> None: now = datetime.now(timezone.utc) serialized_data = self.serializer.serialize(job) - statement = self.t_jobs.insert().values(id=job.id.hex, task_id=job.task_id, - created_at=now, serialized_data=serialized_data) - async with self.bind.begin() as conn: - await conn.execute(statement) + insert = self.t_jobs.insert().values(id=job.id.hex, task_id=job.task_id, + created_at=now, serialized_data=serialized_data) + async with self.engine.begin() as conn: + await conn.execute(insert) event = JobAdded(job_id=job.id, task_id=job.task_id, schedule_id=job.schedule_id, tags=job.tags) @@ -423,13 +426,13 @@ class SQLAlchemyDataStore(AsyncDataStore): job_ids = [job_id.hex for job_id in ids] query = query.where(self.t_jobs.c.id.in_(job_ids)) - async with self.bind.begin() as conn: + async with self.engine.begin() as conn: result = await conn.execute(query) return self._deserialize_jobs(result) async def acquire_jobs(self, worker_id: str, limit: Optional[int] = None) -> List[Job]: - async with self.bind.begin() as conn: + async with self.engine.begin() as conn: now = datetime.now(timezone.utc) acquired_until = now + timedelta(seconds=self.lock_expiration_delay) query = select([self.t_jobs.c.id, self.t_jobs.c.serialized_data]).\ @@ -441,33 +444,33 @@ class SQLAlchemyDataStore(AsyncDataStore): serialized_jobs: Dict[str, bytes] = {row[0]: row[1] for row in await conn.execute(query)} if serialized_jobs: - query = self.t_jobs.update().\ + update = self.t_jobs.update().\ values(acquired_by=worker_id, acquired_until=acquired_until).\ where(self.t_jobs.c.id.in_(serialized_jobs)) - await conn.execute(query) + await conn.execute(update) return self._deserialize_jobs(serialized_jobs.items()) async def release_job(self, worker_id: str, job_id: UUID, result: Optional[JobResult]) -> None: - async with self.bind.begin() as conn: + async with self.engine.begin() as conn: now = datetime.now(timezone.utc) serialized_data = self.serializer.serialize(result) - statement = self.t_job_results.insert().\ + insert = self.t_job_results.insert().\ values(job_id=job_id.hex, finished_at=now, serialized_data=serialized_data) - await conn.execute(statement) + await conn.execute(insert) - statement = self.t_jobs.delete().where(self.t_jobs.c.id == job_id.hex) - await conn.execute(statement) + delete = self.t_jobs.delete().where(self.t_jobs.c.id == job_id.hex) + await conn.execute(delete) async def get_job_result(self, job_id: UUID) -> Optional[JobResult]: - async with self.bind.begin() as conn: - statement = select(self.t_job_results.c.serialized_data).\ + async with self.engine.begin() as conn: + query = select(self.t_job_results.c.serialized_data).\ where(self.t_job_results.c.job_id == job_id.hex) - result = await conn.execute(statement) + result = await conn.execute(query) - statement = self.t_job_results.delete().\ + delete = self.t_job_results.delete().\ where(self.t_job_results.c.job_id == job_id.hex) - await conn.execute(statement) + await conn.execute(delete) serialized_data = result.scalar() return self.serializer.deserialize(serialized_data) if serialized_data else None diff --git a/src/apscheduler/datastores/sync/sqlalchemy.py b/src/apscheduler/datastores/sync/sqlalchemy.py index dbc286e..f1f541a 100644 --- a/src/apscheduler/datastores/sync/sqlalchemy.py +++ b/src/apscheduler/datastores/sync/sqlalchemy.py @@ -7,10 +7,11 @@ from uuid import UUID from sqlalchemy import ( Column, Integer, LargeBinary, MetaData, Table, Unicode, and_, bindparam, or_, select) -from sqlalchemy.engine import URL, Connectable +from sqlalchemy.engine import URL from sqlalchemy.exc import CompileError, IntegrityError -from sqlalchemy.future import create_engine +from sqlalchemy.future import Engine, create_engine from sqlalchemy.sql.ddl import DropTable +from sqlalchemy.sql.elements import BindParameter from ...abc import DataStore, Job, Schedule, Serializer from ...events import ( @@ -27,11 +28,11 @@ logger = logging.getLogger(__name__) @reentrant class SQLAlchemyDataStore(DataStore): - def __init__(self, bind: Connectable, *, schema: Optional[str] = None, + def __init__(self, engine: Engine, *, schema: Optional[str] = None, serializer: Optional[Serializer] = None, lock_expiration_delay: float = 30, max_poll_time: Optional[float] = 1, max_idle_time: float = 60, start_from_scratch: bool = False): - self.bind = bind + self.engine = engine self.schema = schema self.serializer = serializer or PickleSerializer() self.lock_expiration_delay = lock_expiration_delay @@ -49,9 +50,9 @@ class SQLAlchemyDataStore(DataStore): self.t_job_results = self._metadata.tables['job_results'] # Find out if the dialect supports RETURNING - statement = self.t_jobs.update().returning(self.t_schedules.c.id) + update = self.t_jobs.update().returning(self.t_schedules.c.id) try: - statement.compile(bind=self.bind) + update.compile(bind=self.engine) except CompileError: self._supports_update_returning = False else: @@ -63,7 +64,7 @@ class SQLAlchemyDataStore(DataStore): return cls(engine, **options) def __enter__(self): - with self.bind.begin() as conn: + with self.engine.begin() as conn: if self.start_from_scratch: for table in self._metadata.sorted_tables: conn.execute(DropTable(table, if_exists=True)) @@ -85,7 +86,7 @@ class SQLAlchemyDataStore(DataStore): self._events.__exit__(exc_type, exc_val, exc_tb) def get_table_definitions(self) -> MetaData: - if self.bind.dialect.name in ('mysql', 'mariadb'): + if self.engine.dialect.name in ('mysql', 'mariadb'): from sqlalchemy.dialects.mysql import TIMESTAMP timestamp_type = TIMESTAMP(fsp=6) else: @@ -163,13 +164,14 @@ class SQLAlchemyDataStore(DataStore): self._events.unsubscribe(token) def add_schedule(self, schedule: Schedule, conflict_policy: ConflictPolicy) -> None: + event: Event serialized_data = self.serializer.serialize(schedule) - statement = self.t_schedules.insert().\ + insert = self.t_schedules.insert().\ values(id=schedule.id, task_id=schedule.task_id, serialized_data=serialized_data, next_fire_time=schedule.next_fire_time) try: - with self.bind.begin() as conn: - conn.execute(statement) + with self.engine.begin() as conn: + conn.execute(insert) event = ScheduleAdded(schedule_id=schedule.id, next_fire_time=schedule.next_fire_time) self._events.publish(event) @@ -177,26 +179,26 @@ class SQLAlchemyDataStore(DataStore): if conflict_policy is ConflictPolicy.exception: raise ConflictingIdError(schedule.id) from None elif conflict_policy is ConflictPolicy.replace: - statement = self.t_schedules.update().\ + update = self.t_schedules.update().\ where(self.t_schedules.c.id == schedule.id).\ values(serialized_data=serialized_data, next_fire_time=schedule.next_fire_time) - with self.bind.begin() as conn: - conn.execute(statement) + with self.engine.begin() as conn: + conn.execute(update) event = ScheduleUpdated(schedule_id=schedule.id, next_fire_time=schedule.next_fire_time) self._events.publish(event) def remove_schedules(self, ids: Iterable[str]) -> None: - with self.bind.begin() as conn: - statement = self.t_schedules.delete().where(self.t_schedules.c.id.in_(ids)) + with self.engine.begin() as conn: + delete = self.t_schedules.delete().where(self.t_schedules.c.id.in_(ids)) if self._supports_update_returning: - statement = statement.returning(self.t_schedules.c.id) - removed_ids = [row[0] for row in conn.execute(statement)] + delete = delete.returning(self.t_schedules.c.id) + removed_ids: Iterable[str] = [row[0] for row in conn.execute(delete)] else: # TODO: actually check which rows were deleted? - conn.execute(statement) + conn.execute(delete) removed_ids = ids for schedule_id in removed_ids: @@ -208,12 +210,12 @@ class SQLAlchemyDataStore(DataStore): if ids: query = query.where(self.t_schedules.c.id.in_(ids)) - with self.bind.begin() as conn: + with self.engine.begin() as conn: result = conn.execute(query) return self._deserialize_schedules(result) def acquire_schedules(self, scheduler_id: str, limit: int) -> List[Schedule]: - with self.bind.begin() as conn: + with self.engine.begin() as conn: now = datetime.now(timezone.utc) acquired_until = now + timedelta(seconds=self.lock_expiration_delay) schedules_cte = select(self.t_schedules.c.id).\ @@ -224,24 +226,25 @@ class SQLAlchemyDataStore(DataStore): order_by(self.t_schedules.c.next_fire_time).\ limit(limit).cte() subselect = select([schedules_cte.c.id]) - statement = self.t_schedules.update().\ + update = self.t_schedules.update().\ where(self.t_schedules.c.id.in_(subselect)).\ values(acquired_by=scheduler_id, acquired_until=acquired_until) if self._supports_update_returning: - statement = statement.returning(self.t_schedules.c.id, - self.t_schedules.c.serialized_data) + update = update.returning(self.t_schedules.c.id, + self.t_schedules.c.serialized_data) + result = conn.execute(update) else: - conn.execute(statement) - statement = select([self.t_schedules.c.id, self.t_schedules.c.serialized_data]).\ + conn.execute(update) + query = select([self.t_schedules.c.id, self.t_schedules.c.serialized_data]).\ where(and_(self.t_schedules.c.acquired_by == scheduler_id)) + result = conn.execute(query) - result = conn.execute(statement) schedules = self._deserialize_schedules(result) return schedules def release_schedules(self, scheduler_id: str, schedules: List[Schedule]) -> None: - with self.bind.begin() as conn: + with self.engine.begin() as conn: update_events: List[ScheduleUpdated] = [] finished_schedule_ids: List[str] = [] update_args: List[Dict[str, Any]] = [] @@ -265,21 +268,21 @@ class SQLAlchemyDataStore(DataStore): # Update schedules that have a next fire time if update_args: - p_id = bindparam('p_id') - p_serialized = bindparam('p_serialized_data') - p_next_fire_time = bindparam('p_next_fire_time') - statement = self.t_schedules.update().\ + p_id: BindParameter = bindparam('p_id') + p_serialized: BindParameter = bindparam('p_serialized_data') + p_next_fire_time: BindParameter = bindparam('p_next_fire_time') + update = self.t_schedules.update().\ where(and_(self.t_schedules.c.id == p_id, self.t_schedules.c.acquired_by == scheduler_id)).\ values(serialized_data=p_serialized, next_fire_time=p_next_fire_time, acquired_by=None, acquired_until=None) next_fire_times = {arg['p_id']: arg['p_next_fire_time'] for arg in update_args} if self._supports_update_returning: - statement = statement.returning(self.t_schedules.c.id) - updated_ids = [row[0] for row in conn.execute(statement, update_args)] + update = update.returning(self.t_schedules.c.id) + updated_ids = [row[0] for row in conn.execute(update, update_args)] else: # TODO: actually check which rows were updated? - conn.execute(statement, update_args) + conn.execute(update, update_args) updated_ids = list(next_fire_times) for schedule_id in updated_ids: @@ -289,9 +292,9 @@ class SQLAlchemyDataStore(DataStore): # Remove schedules that have no next fire time or failed to serialize if finished_schedule_ids: - statement = self.t_schedules.delete().\ + delete = self.t_schedules.delete().\ where(self.t_schedules.c.id.in_(finished_schedule_ids)) - conn.execute(statement) + conn.execute(delete) for event in update_events: self._events.publish(event) @@ -300,21 +303,21 @@ class SQLAlchemyDataStore(DataStore): self._events.publish(ScheduleRemoved(schedule_id=schedule_id)) def get_next_schedule_run_time(self) -> Optional[datetime]: - statenent = select(self.t_schedules.c.id).\ + query = select(self.t_schedules.c.id).\ where(self.t_schedules.c.next_fire_time.isnot(None)).\ order_by(self.t_schedules.c.next_fire_time).\ limit(1) - with self.bind.begin() as conn: - result = conn.execute(statenent) + with self.engine.begin() as conn: + result = conn.execute(query) return result.scalar() def add_job(self, job: Job) -> None: now = datetime.now(timezone.utc) serialized_data = self.serializer.serialize(job) - statement = self.t_jobs.insert().values(id=job.id.hex, task_id=job.task_id, - created_at=now, serialized_data=serialized_data) - with self.bind.begin() as conn: - conn.execute(statement) + insert = self.t_jobs.insert().values(id=job.id.hex, task_id=job.task_id, + created_at=now, serialized_data=serialized_data) + with self.engine.begin() as conn: + conn.execute(insert) event = JobAdded(job_id=job.id, task_id=job.task_id, schedule_id=job.schedule_id, tags=job.tags) @@ -327,12 +330,12 @@ class SQLAlchemyDataStore(DataStore): job_ids = [job_id.hex for job_id in ids] query = query.where(self.t_jobs.c.id.in_(job_ids)) - with self.bind.begin() as conn: + with self.engine.begin() as conn: result = conn.execute(query) return self._deserialize_jobs(result) def acquire_jobs(self, worker_id: str, limit: Optional[int] = None) -> List[Job]: - with self.bind.begin() as conn: + with self.engine.begin() as conn: now = datetime.now(timezone.utc) acquired_until = now + timedelta(seconds=self.lock_expiration_delay) query = select([self.t_jobs.c.id, self.t_jobs.c.serialized_data]).\ @@ -344,33 +347,33 @@ class SQLAlchemyDataStore(DataStore): serialized_jobs: Dict[str, bytes] = {row[0]: row[1] for row in conn.execute(query)} if serialized_jobs: - query = self.t_jobs.update().\ + update = self.t_jobs.update().\ values(acquired_by=worker_id, acquired_until=acquired_until).\ where(self.t_jobs.c.id.in_(serialized_jobs)) - conn.execute(query) + conn.execute(update) return self._deserialize_jobs(serialized_jobs.items()) def release_job(self, worker_id: str, job_id: UUID, result: Optional[JobResult]) -> None: - with self.bind.begin() as conn: + with self.engine.begin() as conn: now = datetime.now(timezone.utc) serialized_result = self.serializer.serialize(result) - statement = self.t_job_results.insert().\ + insert = self.t_job_results.insert().\ values(job_id=job_id.hex, finished_at=now, serialized_data=serialized_result) - conn.execute(statement) + conn.execute(insert) - statement = self.t_jobs.delete().where(self.t_jobs.c.id == job_id.hex) - conn.execute(statement) + delete = self.t_jobs.delete().where(self.t_jobs.c.id == job_id.hex) + conn.execute(delete) def get_job_result(self, job_id: UUID) -> Optional[JobResult]: - with self.bind.begin() as conn: - statement = select(self.t_job_results.c.serialized_data).\ + with self.engine.begin() as conn: + query = select(self.t_job_results.c.serialized_data).\ where(self.t_job_results.c.job_id == job_id.hex) - result = conn.execute(statement) + result = conn.execute(query) - statement = self.t_job_results.delete().\ + delete = self.t_job_results.delete().\ where(self.t_job_results.c.job_id == job_id.hex) - conn.execute(statement) + conn.execute(delete) serialized_result = result.scalar() return self.serializer.deserialize(serialized_result) if serialized_result else None diff --git a/src/apscheduler/schedulers/async_.py b/src/apscheduler/schedulers/async_.py index 9b69514..bb1760d 100644 --- a/src/apscheduler/schedulers/async_.py +++ b/src/apscheduler/schedulers/async_.py @@ -29,6 +29,7 @@ from ..workers.async_ import AsyncWorker class AsyncScheduler(EventSource): """An asynchronous (AnyIO based) scheduler implementation.""" + data_store: AsyncDataStore _state: RunState = RunState.stopped _wakeup_event: anyio.Event _worker: Optional[AsyncWorker] = None diff --git a/src/apscheduler/schedulers/sync.py b/src/apscheduler/schedulers/sync.py index feffa0b..3005d27 100644 --- a/src/apscheduler/schedulers/sync.py +++ b/src/apscheduler/schedulers/sync.py @@ -70,7 +70,7 @@ class Scheduler(EventSource): self._exit_stack.enter_context(self._worker) # Start the worker and return when it has signalled readiness or raised an exception - start_future: Future[None] = Future() + start_future: Future[Event] = Future() token = self._events.subscribe(start_future.set_result) run_future = self._executor.submit(self.run) try: |