summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2021-09-03 00:15:23 +0300
committerAlex Grönholm <alex.gronholm@nextday.fi>2021-09-06 01:39:07 +0300
commit3cff37749365e6312871a3313b38a5b418666f8c (patch)
treeb3ff9f6fa431af7adcb2fb7a3ef2b00871a4f738
parentfa522949af87c2615592b0342b00c0ee6af1bdde (diff)
downloadapscheduler-3cff37749365e6312871a3313b38a5b418666f8c.tar.gz
Mypy fixes
-rw-r--r--src/apscheduler/datastores/async_/sqlalchemy.py117
-rw-r--r--src/apscheduler/datastores/sync/sqlalchemy.py119
-rw-r--r--src/apscheduler/schedulers/async_.py1
-rw-r--r--src/apscheduler/schedulers/sync.py2
4 files changed, 123 insertions, 116 deletions
diff --git a/src/apscheduler/datastores/async_/sqlalchemy.py b/src/apscheduler/datastores/async_/sqlalchemy.py
index 3938f7e..caf832c 100644
--- a/src/apscheduler/datastores/async_/sqlalchemy.py
+++ b/src/apscheduler/datastores/async_/sqlalchemy.py
@@ -16,8 +16,9 @@ from sqlalchemy import (
from sqlalchemy.engine import URL
from sqlalchemy.exc import CompileError, IntegrityError
from sqlalchemy.ext.asyncio import AsyncConnection, create_async_engine
-from sqlalchemy.ext.asyncio.engine import AsyncConnectable
+from sqlalchemy.ext.asyncio.engine import AsyncEngine
from sqlalchemy.sql.ddl import DropTable
+from sqlalchemy.sql.elements import BindParameter
from ... import events as events_module
from ...abc import AsyncDataStore, Job, Schedule, Serializer
@@ -58,12 +59,12 @@ def json_object_hook(obj: Dict[str, Any]) -> Any:
@reentrant
class SQLAlchemyDataStore(AsyncDataStore):
- def __init__(self, bind: AsyncConnectable, *, schema: Optional[str] = None,
+ def __init__(self, engine: AsyncEngine, *, schema: Optional[str] = None,
serializer: Optional[Serializer] = None,
lock_expiration_delay: float = 30, max_poll_time: Optional[float] = 1,
max_idle_time: float = 60, start_from_scratch: bool = False,
notify_channel: Optional[str] = 'apscheduler'):
- self.bind = bind
+ self.engine = engine
self.schema = schema
self.serializer = serializer or PickleSerializer()
self.lock_expiration_delay = lock_expiration_delay
@@ -82,9 +83,9 @@ class SQLAlchemyDataStore(AsyncDataStore):
self.t_job_results = self._metadata.tables['job_results']
# Find out if the dialect supports RETURNING
- statement = self.t_jobs.update().returning(self.t_jobs.c.id)
+ update = self.t_jobs.update().returning(self.t_jobs.c.id)
try:
- statement.compile(bind=self.bind)
+ update.compile(bind=self.engine)
except CompileError:
self._supports_update_returning = False
else:
@@ -92,7 +93,7 @@ class SQLAlchemyDataStore(AsyncDataStore):
self.notify_channel = notify_channel
if notify_channel:
- if self.bind.dialect.name != 'postgresql' or self.bind.dialect.driver != 'asyncpg':
+ if self.engine.dialect.name != 'postgresql' or self.engine.dialect.driver != 'asyncpg':
self.notify_channel = None
@classmethod
@@ -106,7 +107,7 @@ class SQLAlchemyDataStore(AsyncDataStore):
raise RuntimeError(f'This data store requires asyncio; currently running: {asynclib}')
# Verify that the schema is in place
- async with self.bind.begin() as conn:
+ async with self.engine.begin() as conn:
if self.start_from_scratch:
for table in self._metadata.sorted_tables:
await conn.execute(DropTable(table, if_exists=True))
@@ -135,7 +136,7 @@ class SQLAlchemyDataStore(AsyncDataStore):
await self._exit_stack.__aexit__(exc_type, exc_val, exc_tb)
def get_table_definitions(self) -> MetaData:
- if self.bind.dialect.name in ('mysql', 'mariadb'):
+ if self.engine.dialect.name in ('mysql', 'mariadb'):
from sqlalchemy.dialects.mysql import TIMESTAMP
timestamp_type = TIMESTAMP(fsp=6)
else:
@@ -215,7 +216,7 @@ class SQLAlchemyDataStore(AsyncDataStore):
task_started_sent = False
while True:
- with closing(await self.bind.raw_connection()) as conn:
+ with closing(await self.engine.raw_connection()) as conn:
asyncpg_conn = conn.connection._connection
await asyncpg_conn.add_listener(self.notify_channel, callback)
if not task_started_sent:
@@ -259,13 +260,14 @@ class SQLAlchemyDataStore(AsyncDataStore):
self._events.unsubscribe(token)
async def add_schedule(self, schedule: Schedule, conflict_policy: ConflictPolicy) -> None:
+ event: Event
serialized_data = self.serializer.serialize(schedule)
- statement = self.t_schedules.insert().\
+ insert = self.t_schedules.insert().\
values(id=schedule.id, task_id=schedule.task_id, serialized_data=serialized_data,
next_fire_time=schedule.next_fire_time)
try:
- async with self.bind.begin() as conn:
- await conn.execute(statement)
+ async with self.engine.begin() as conn:
+ await conn.execute(insert)
event = ScheduleAdded(schedule_id=schedule.id,
next_fire_time=schedule.next_fire_time)
await self._publish(conn, event)
@@ -273,26 +275,26 @@ class SQLAlchemyDataStore(AsyncDataStore):
if conflict_policy is ConflictPolicy.exception:
raise ConflictingIdError(schedule.id) from None
elif conflict_policy is ConflictPolicy.replace:
- statement = self.t_schedules.update().\
+ update = self.t_schedules.update().\
where(self.t_schedules.c.id == schedule.id).\
values(serialized_data=serialized_data,
next_fire_time=schedule.next_fire_time)
- async with self.bind.begin() as conn:
- await conn.execute(statement)
+ async with self.engine.begin() as conn:
+ await conn.execute(update)
event = ScheduleUpdated(schedule_id=schedule.id,
next_fire_time=schedule.next_fire_time)
await self._publish(conn, event)
async def remove_schedules(self, ids: Iterable[str]) -> None:
- async with self.bind.begin() as conn:
- statement = self.t_schedules.delete().where(self.t_schedules.c.id.in_(ids))
+ async with self.engine.begin() as conn:
+ delete = self.t_schedules.delete().where(self.t_schedules.c.id.in_(ids))
if self._supports_update_returning:
- statement = statement.returning(self.t_schedules.c.id)
- removed_ids = [row[0] for row in await conn.execute(statement)]
+ delete = delete.returning(self.t_schedules.c.id)
+ removed_ids: Iterable[str] = [row[0] for row in await conn.execute(delete)]
else:
# TODO: actually check which rows were deleted?
- await conn.execute(statement)
+ await conn.execute(delete)
removed_ids = ids
for schedule_id in removed_ids:
@@ -304,12 +306,12 @@ class SQLAlchemyDataStore(AsyncDataStore):
if ids:
query = query.where(self.t_schedules.c.id.in_(ids))
- async with self.bind.begin() as conn:
+ async with self.engine.begin() as conn:
result = await conn.execute(query)
return self._deserialize_schedules(result)
async def acquire_schedules(self, scheduler_id: str, limit: int) -> List[Schedule]:
- async with self.bind.begin() as conn:
+ async with self.engine.begin() as conn:
now = datetime.now(timezone.utc)
acquired_until = now + timedelta(seconds=self.lock_expiration_delay)
schedules_cte = select(self.t_schedules.c.id).\
@@ -320,24 +322,25 @@ class SQLAlchemyDataStore(AsyncDataStore):
order_by(self.t_schedules.c.next_fire_time).\
limit(limit).cte()
subselect = select([schedules_cte.c.id])
- statement = self.t_schedules.update().\
+ update = self.t_schedules.update().\
where(self.t_schedules.c.id.in_(subselect)).\
values(acquired_by=scheduler_id, acquired_until=acquired_until)
if self._supports_update_returning:
- statement = statement.returning(self.t_schedules.c.id,
- self.t_schedules.c.serialized_data)
+ update = update.returning(self.t_schedules.c.id,
+ self.t_schedules.c.serialized_data)
+ result = await conn.execute(update)
else:
- await conn.execute(statement)
- statement = select([self.t_schedules.c.id, self.t_schedules.c.serialized_data]).\
+ await conn.execute(update)
+ query = select([self.t_schedules.c.id, self.t_schedules.c.serialized_data]).\
where(and_(self.t_schedules.c.acquired_by == scheduler_id))
+ result = await conn.execute(query)
- result = await conn.execute(statement)
schedules = self._deserialize_schedules(result)
return schedules
async def release_schedules(self, scheduler_id: str, schedules: List[Schedule]) -> None:
- async with self.bind.begin() as conn:
+ async with self.engine.begin() as conn:
update_events: List[ScheduleUpdated] = []
finished_schedule_ids: List[str] = []
update_args: List[Dict[str, Any]] = []
@@ -361,21 +364,21 @@ class SQLAlchemyDataStore(AsyncDataStore):
# Update schedules that have a next fire time
if update_args:
- p_id = bindparam('p_id')
- p_serialized = bindparam('p_serialized_data')
- p_next_fire_time = bindparam('p_next_fire_time')
- statement = self.t_schedules.update().\
+ p_id: BindParameter = bindparam('p_id')
+ p_serialized: BindParameter = bindparam('p_serialized_data')
+ p_next_fire_time: BindParameter = bindparam('p_next_fire_time')
+ update = self.t_schedules.update().\
where(and_(self.t_schedules.c.id == p_id,
self.t_schedules.c.acquired_by == scheduler_id)).\
values(serialized_data=p_serialized, next_fire_time=p_next_fire_time,
acquired_by=None, acquired_until=None)
next_fire_times = {arg['p_id']: arg['p_next_fire_time'] for arg in update_args}
if self._supports_update_returning:
- statement = statement.returning(self.t_schedules.c.id)
- updated_ids = [row[0] for row in await conn.execute(statement, update_args)]
+ update = update.returning(self.t_schedules.c.id)
+ updated_ids = [row[0] for row in await conn.execute(update, update_args)]
else:
# TODO: actually check which rows were updated?
- await conn.execute(statement, update_args)
+ await conn.execute(update, update_args)
updated_ids = list(next_fire_times)
for schedule_id in updated_ids:
@@ -385,9 +388,9 @@ class SQLAlchemyDataStore(AsyncDataStore):
# Remove schedules that have no next fire time or failed to serialize
if finished_schedule_ids:
- statement = self.t_schedules.delete().\
+ delete = self.t_schedules.delete().\
where(self.t_schedules.c.id.in_(finished_schedule_ids))
- await conn.execute(statement)
+ await conn.execute(delete)
for event in update_events:
await self._publish(conn, event)
@@ -400,17 +403,17 @@ class SQLAlchemyDataStore(AsyncDataStore):
where(self.t_schedules.c.next_fire_time.isnot(None)).\
order_by(self.t_schedules.c.next_fire_time).\
limit(1)
- async with self.bind.begin() as conn:
+ async with self.engine.begin() as conn:
result = await conn.execute(statenent)
return result.scalar()
async def add_job(self, job: Job) -> None:
now = datetime.now(timezone.utc)
serialized_data = self.serializer.serialize(job)
- statement = self.t_jobs.insert().values(id=job.id.hex, task_id=job.task_id,
- created_at=now, serialized_data=serialized_data)
- async with self.bind.begin() as conn:
- await conn.execute(statement)
+ insert = self.t_jobs.insert().values(id=job.id.hex, task_id=job.task_id,
+ created_at=now, serialized_data=serialized_data)
+ async with self.engine.begin() as conn:
+ await conn.execute(insert)
event = JobAdded(job_id=job.id, task_id=job.task_id, schedule_id=job.schedule_id,
tags=job.tags)
@@ -423,13 +426,13 @@ class SQLAlchemyDataStore(AsyncDataStore):
job_ids = [job_id.hex for job_id in ids]
query = query.where(self.t_jobs.c.id.in_(job_ids))
- async with self.bind.begin() as conn:
+ async with self.engine.begin() as conn:
result = await conn.execute(query)
return self._deserialize_jobs(result)
async def acquire_jobs(self, worker_id: str, limit: Optional[int] = None) -> List[Job]:
- async with self.bind.begin() as conn:
+ async with self.engine.begin() as conn:
now = datetime.now(timezone.utc)
acquired_until = now + timedelta(seconds=self.lock_expiration_delay)
query = select([self.t_jobs.c.id, self.t_jobs.c.serialized_data]).\
@@ -441,33 +444,33 @@ class SQLAlchemyDataStore(AsyncDataStore):
serialized_jobs: Dict[str, bytes] = {row[0]: row[1]
for row in await conn.execute(query)}
if serialized_jobs:
- query = self.t_jobs.update().\
+ update = self.t_jobs.update().\
values(acquired_by=worker_id, acquired_until=acquired_until).\
where(self.t_jobs.c.id.in_(serialized_jobs))
- await conn.execute(query)
+ await conn.execute(update)
return self._deserialize_jobs(serialized_jobs.items())
async def release_job(self, worker_id: str, job_id: UUID, result: Optional[JobResult]) -> None:
- async with self.bind.begin() as conn:
+ async with self.engine.begin() as conn:
now = datetime.now(timezone.utc)
serialized_data = self.serializer.serialize(result)
- statement = self.t_job_results.insert().\
+ insert = self.t_job_results.insert().\
values(job_id=job_id.hex, finished_at=now, serialized_data=serialized_data)
- await conn.execute(statement)
+ await conn.execute(insert)
- statement = self.t_jobs.delete().where(self.t_jobs.c.id == job_id.hex)
- await conn.execute(statement)
+ delete = self.t_jobs.delete().where(self.t_jobs.c.id == job_id.hex)
+ await conn.execute(delete)
async def get_job_result(self, job_id: UUID) -> Optional[JobResult]:
- async with self.bind.begin() as conn:
- statement = select(self.t_job_results.c.serialized_data).\
+ async with self.engine.begin() as conn:
+ query = select(self.t_job_results.c.serialized_data).\
where(self.t_job_results.c.job_id == job_id.hex)
- result = await conn.execute(statement)
+ result = await conn.execute(query)
- statement = self.t_job_results.delete().\
+ delete = self.t_job_results.delete().\
where(self.t_job_results.c.job_id == job_id.hex)
- await conn.execute(statement)
+ await conn.execute(delete)
serialized_data = result.scalar()
return self.serializer.deserialize(serialized_data) if serialized_data else None
diff --git a/src/apscheduler/datastores/sync/sqlalchemy.py b/src/apscheduler/datastores/sync/sqlalchemy.py
index dbc286e..f1f541a 100644
--- a/src/apscheduler/datastores/sync/sqlalchemy.py
+++ b/src/apscheduler/datastores/sync/sqlalchemy.py
@@ -7,10 +7,11 @@ from uuid import UUID
from sqlalchemy import (
Column, Integer, LargeBinary, MetaData, Table, Unicode, and_, bindparam, or_, select)
-from sqlalchemy.engine import URL, Connectable
+from sqlalchemy.engine import URL
from sqlalchemy.exc import CompileError, IntegrityError
-from sqlalchemy.future import create_engine
+from sqlalchemy.future import Engine, create_engine
from sqlalchemy.sql.ddl import DropTable
+from sqlalchemy.sql.elements import BindParameter
from ...abc import DataStore, Job, Schedule, Serializer
from ...events import (
@@ -27,11 +28,11 @@ logger = logging.getLogger(__name__)
@reentrant
class SQLAlchemyDataStore(DataStore):
- def __init__(self, bind: Connectable, *, schema: Optional[str] = None,
+ def __init__(self, engine: Engine, *, schema: Optional[str] = None,
serializer: Optional[Serializer] = None,
lock_expiration_delay: float = 30, max_poll_time: Optional[float] = 1,
max_idle_time: float = 60, start_from_scratch: bool = False):
- self.bind = bind
+ self.engine = engine
self.schema = schema
self.serializer = serializer or PickleSerializer()
self.lock_expiration_delay = lock_expiration_delay
@@ -49,9 +50,9 @@ class SQLAlchemyDataStore(DataStore):
self.t_job_results = self._metadata.tables['job_results']
# Find out if the dialect supports RETURNING
- statement = self.t_jobs.update().returning(self.t_schedules.c.id)
+ update = self.t_jobs.update().returning(self.t_schedules.c.id)
try:
- statement.compile(bind=self.bind)
+ update.compile(bind=self.engine)
except CompileError:
self._supports_update_returning = False
else:
@@ -63,7 +64,7 @@ class SQLAlchemyDataStore(DataStore):
return cls(engine, **options)
def __enter__(self):
- with self.bind.begin() as conn:
+ with self.engine.begin() as conn:
if self.start_from_scratch:
for table in self._metadata.sorted_tables:
conn.execute(DropTable(table, if_exists=True))
@@ -85,7 +86,7 @@ class SQLAlchemyDataStore(DataStore):
self._events.__exit__(exc_type, exc_val, exc_tb)
def get_table_definitions(self) -> MetaData:
- if self.bind.dialect.name in ('mysql', 'mariadb'):
+ if self.engine.dialect.name in ('mysql', 'mariadb'):
from sqlalchemy.dialects.mysql import TIMESTAMP
timestamp_type = TIMESTAMP(fsp=6)
else:
@@ -163,13 +164,14 @@ class SQLAlchemyDataStore(DataStore):
self._events.unsubscribe(token)
def add_schedule(self, schedule: Schedule, conflict_policy: ConflictPolicy) -> None:
+ event: Event
serialized_data = self.serializer.serialize(schedule)
- statement = self.t_schedules.insert().\
+ insert = self.t_schedules.insert().\
values(id=schedule.id, task_id=schedule.task_id, serialized_data=serialized_data,
next_fire_time=schedule.next_fire_time)
try:
- with self.bind.begin() as conn:
- conn.execute(statement)
+ with self.engine.begin() as conn:
+ conn.execute(insert)
event = ScheduleAdded(schedule_id=schedule.id,
next_fire_time=schedule.next_fire_time)
self._events.publish(event)
@@ -177,26 +179,26 @@ class SQLAlchemyDataStore(DataStore):
if conflict_policy is ConflictPolicy.exception:
raise ConflictingIdError(schedule.id) from None
elif conflict_policy is ConflictPolicy.replace:
- statement = self.t_schedules.update().\
+ update = self.t_schedules.update().\
where(self.t_schedules.c.id == schedule.id).\
values(serialized_data=serialized_data,
next_fire_time=schedule.next_fire_time)
- with self.bind.begin() as conn:
- conn.execute(statement)
+ with self.engine.begin() as conn:
+ conn.execute(update)
event = ScheduleUpdated(schedule_id=schedule.id,
next_fire_time=schedule.next_fire_time)
self._events.publish(event)
def remove_schedules(self, ids: Iterable[str]) -> None:
- with self.bind.begin() as conn:
- statement = self.t_schedules.delete().where(self.t_schedules.c.id.in_(ids))
+ with self.engine.begin() as conn:
+ delete = self.t_schedules.delete().where(self.t_schedules.c.id.in_(ids))
if self._supports_update_returning:
- statement = statement.returning(self.t_schedules.c.id)
- removed_ids = [row[0] for row in conn.execute(statement)]
+ delete = delete.returning(self.t_schedules.c.id)
+ removed_ids: Iterable[str] = [row[0] for row in conn.execute(delete)]
else:
# TODO: actually check which rows were deleted?
- conn.execute(statement)
+ conn.execute(delete)
removed_ids = ids
for schedule_id in removed_ids:
@@ -208,12 +210,12 @@ class SQLAlchemyDataStore(DataStore):
if ids:
query = query.where(self.t_schedules.c.id.in_(ids))
- with self.bind.begin() as conn:
+ with self.engine.begin() as conn:
result = conn.execute(query)
return self._deserialize_schedules(result)
def acquire_schedules(self, scheduler_id: str, limit: int) -> List[Schedule]:
- with self.bind.begin() as conn:
+ with self.engine.begin() as conn:
now = datetime.now(timezone.utc)
acquired_until = now + timedelta(seconds=self.lock_expiration_delay)
schedules_cte = select(self.t_schedules.c.id).\
@@ -224,24 +226,25 @@ class SQLAlchemyDataStore(DataStore):
order_by(self.t_schedules.c.next_fire_time).\
limit(limit).cte()
subselect = select([schedules_cte.c.id])
- statement = self.t_schedules.update().\
+ update = self.t_schedules.update().\
where(self.t_schedules.c.id.in_(subselect)).\
values(acquired_by=scheduler_id, acquired_until=acquired_until)
if self._supports_update_returning:
- statement = statement.returning(self.t_schedules.c.id,
- self.t_schedules.c.serialized_data)
+ update = update.returning(self.t_schedules.c.id,
+ self.t_schedules.c.serialized_data)
+ result = conn.execute(update)
else:
- conn.execute(statement)
- statement = select([self.t_schedules.c.id, self.t_schedules.c.serialized_data]).\
+ conn.execute(update)
+ query = select([self.t_schedules.c.id, self.t_schedules.c.serialized_data]).\
where(and_(self.t_schedules.c.acquired_by == scheduler_id))
+ result = conn.execute(query)
- result = conn.execute(statement)
schedules = self._deserialize_schedules(result)
return schedules
def release_schedules(self, scheduler_id: str, schedules: List[Schedule]) -> None:
- with self.bind.begin() as conn:
+ with self.engine.begin() as conn:
update_events: List[ScheduleUpdated] = []
finished_schedule_ids: List[str] = []
update_args: List[Dict[str, Any]] = []
@@ -265,21 +268,21 @@ class SQLAlchemyDataStore(DataStore):
# Update schedules that have a next fire time
if update_args:
- p_id = bindparam('p_id')
- p_serialized = bindparam('p_serialized_data')
- p_next_fire_time = bindparam('p_next_fire_time')
- statement = self.t_schedules.update().\
+ p_id: BindParameter = bindparam('p_id')
+ p_serialized: BindParameter = bindparam('p_serialized_data')
+ p_next_fire_time: BindParameter = bindparam('p_next_fire_time')
+ update = self.t_schedules.update().\
where(and_(self.t_schedules.c.id == p_id,
self.t_schedules.c.acquired_by == scheduler_id)).\
values(serialized_data=p_serialized, next_fire_time=p_next_fire_time,
acquired_by=None, acquired_until=None)
next_fire_times = {arg['p_id']: arg['p_next_fire_time'] for arg in update_args}
if self._supports_update_returning:
- statement = statement.returning(self.t_schedules.c.id)
- updated_ids = [row[0] for row in conn.execute(statement, update_args)]
+ update = update.returning(self.t_schedules.c.id)
+ updated_ids = [row[0] for row in conn.execute(update, update_args)]
else:
# TODO: actually check which rows were updated?
- conn.execute(statement, update_args)
+ conn.execute(update, update_args)
updated_ids = list(next_fire_times)
for schedule_id in updated_ids:
@@ -289,9 +292,9 @@ class SQLAlchemyDataStore(DataStore):
# Remove schedules that have no next fire time or failed to serialize
if finished_schedule_ids:
- statement = self.t_schedules.delete().\
+ delete = self.t_schedules.delete().\
where(self.t_schedules.c.id.in_(finished_schedule_ids))
- conn.execute(statement)
+ conn.execute(delete)
for event in update_events:
self._events.publish(event)
@@ -300,21 +303,21 @@ class SQLAlchemyDataStore(DataStore):
self._events.publish(ScheduleRemoved(schedule_id=schedule_id))
def get_next_schedule_run_time(self) -> Optional[datetime]:
- statenent = select(self.t_schedules.c.id).\
+ query = select(self.t_schedules.c.id).\
where(self.t_schedules.c.next_fire_time.isnot(None)).\
order_by(self.t_schedules.c.next_fire_time).\
limit(1)
- with self.bind.begin() as conn:
- result = conn.execute(statenent)
+ with self.engine.begin() as conn:
+ result = conn.execute(query)
return result.scalar()
def add_job(self, job: Job) -> None:
now = datetime.now(timezone.utc)
serialized_data = self.serializer.serialize(job)
- statement = self.t_jobs.insert().values(id=job.id.hex, task_id=job.task_id,
- created_at=now, serialized_data=serialized_data)
- with self.bind.begin() as conn:
- conn.execute(statement)
+ insert = self.t_jobs.insert().values(id=job.id.hex, task_id=job.task_id,
+ created_at=now, serialized_data=serialized_data)
+ with self.engine.begin() as conn:
+ conn.execute(insert)
event = JobAdded(job_id=job.id, task_id=job.task_id, schedule_id=job.schedule_id,
tags=job.tags)
@@ -327,12 +330,12 @@ class SQLAlchemyDataStore(DataStore):
job_ids = [job_id.hex for job_id in ids]
query = query.where(self.t_jobs.c.id.in_(job_ids))
- with self.bind.begin() as conn:
+ with self.engine.begin() as conn:
result = conn.execute(query)
return self._deserialize_jobs(result)
def acquire_jobs(self, worker_id: str, limit: Optional[int] = None) -> List[Job]:
- with self.bind.begin() as conn:
+ with self.engine.begin() as conn:
now = datetime.now(timezone.utc)
acquired_until = now + timedelta(seconds=self.lock_expiration_delay)
query = select([self.t_jobs.c.id, self.t_jobs.c.serialized_data]).\
@@ -344,33 +347,33 @@ class SQLAlchemyDataStore(DataStore):
serialized_jobs: Dict[str, bytes] = {row[0]: row[1]
for row in conn.execute(query)}
if serialized_jobs:
- query = self.t_jobs.update().\
+ update = self.t_jobs.update().\
values(acquired_by=worker_id, acquired_until=acquired_until).\
where(self.t_jobs.c.id.in_(serialized_jobs))
- conn.execute(query)
+ conn.execute(update)
return self._deserialize_jobs(serialized_jobs.items())
def release_job(self, worker_id: str, job_id: UUID, result: Optional[JobResult]) -> None:
- with self.bind.begin() as conn:
+ with self.engine.begin() as conn:
now = datetime.now(timezone.utc)
serialized_result = self.serializer.serialize(result)
- statement = self.t_job_results.insert().\
+ insert = self.t_job_results.insert().\
values(job_id=job_id.hex, finished_at=now, serialized_data=serialized_result)
- conn.execute(statement)
+ conn.execute(insert)
- statement = self.t_jobs.delete().where(self.t_jobs.c.id == job_id.hex)
- conn.execute(statement)
+ delete = self.t_jobs.delete().where(self.t_jobs.c.id == job_id.hex)
+ conn.execute(delete)
def get_job_result(self, job_id: UUID) -> Optional[JobResult]:
- with self.bind.begin() as conn:
- statement = select(self.t_job_results.c.serialized_data).\
+ with self.engine.begin() as conn:
+ query = select(self.t_job_results.c.serialized_data).\
where(self.t_job_results.c.job_id == job_id.hex)
- result = conn.execute(statement)
+ result = conn.execute(query)
- statement = self.t_job_results.delete().\
+ delete = self.t_job_results.delete().\
where(self.t_job_results.c.job_id == job_id.hex)
- conn.execute(statement)
+ conn.execute(delete)
serialized_result = result.scalar()
return self.serializer.deserialize(serialized_result) if serialized_result else None
diff --git a/src/apscheduler/schedulers/async_.py b/src/apscheduler/schedulers/async_.py
index 9b69514..bb1760d 100644
--- a/src/apscheduler/schedulers/async_.py
+++ b/src/apscheduler/schedulers/async_.py
@@ -29,6 +29,7 @@ from ..workers.async_ import AsyncWorker
class AsyncScheduler(EventSource):
"""An asynchronous (AnyIO based) scheduler implementation."""
+ data_store: AsyncDataStore
_state: RunState = RunState.stopped
_wakeup_event: anyio.Event
_worker: Optional[AsyncWorker] = None
diff --git a/src/apscheduler/schedulers/sync.py b/src/apscheduler/schedulers/sync.py
index feffa0b..3005d27 100644
--- a/src/apscheduler/schedulers/sync.py
+++ b/src/apscheduler/schedulers/sync.py
@@ -70,7 +70,7 @@ class Scheduler(EventSource):
self._exit_stack.enter_context(self._worker)
# Start the worker and return when it has signalled readiness or raised an exception
- start_future: Future[None] = Future()
+ start_future: Future[Event] = Future()
token = self._events.subscribe(start_future.set_result)
run_future = self._executor.submit(self.run)
try: