diff options
author | Alex Grönholm <alex.gronholm@nextday.fi> | 2021-09-05 23:42:05 +0300 |
---|---|---|
committer | Alex Grönholm <alex.gronholm@nextday.fi> | 2021-09-06 01:39:07 +0300 |
commit | 148b29270eb8fa0974f29be4d85a0ee03b848d1a (patch) | |
tree | 4d874a4bb230151eedaed19ddf14af0ec0e7c060 | |
parent | 2a685fe105b6c715c16912109dfc0f982e0acd5c (diff) | |
download | apscheduler-148b29270eb8fa0974f29be4d85a0ee03b848d1a.tar.gz |
Migrated annotations to the py3.10 style
Using "from __future__ import annotations" we can do this even on Python 3.7.
22 files changed, 187 insertions, 170 deletions
diff --git a/src/apscheduler/abc.py b/src/apscheduler/abc.py index 90346b4..1038904 100644 --- a/src/apscheduler/abc.py +++ b/src/apscheduler/abc.py @@ -3,7 +3,7 @@ from __future__ import annotations from abc import ABCMeta, abstractmethod from base64 import b64decode, b64encode from datetime import datetime -from typing import TYPE_CHECKING, Any, Callable, Iterable, Iterator, List, Optional, Set, Type +from typing import TYPE_CHECKING, Any, Callable, Iterable, Iterator, Optional, Type from uuid import UUID from .enums import ConflictPolicy @@ -124,7 +124,7 @@ class DataStore(EventSource): """ @abstractmethod - def get_tasks(self) -> List[Task]: + def get_tasks(self) -> list[Task]: """ Get all the tasks in this store. @@ -132,7 +132,7 @@ class DataStore(EventSource): """ @abstractmethod - def get_schedules(self, ids: Optional[Set[str]] = None) -> List[Schedule]: + def get_schedules(self, ids: Optional[set[str]] = None) -> list[Schedule]: """ Get schedules from the data store. @@ -159,7 +159,7 @@ class DataStore(EventSource): """ @abstractmethod - def acquire_schedules(self, scheduler_id: str, limit: int) -> List[Schedule]: + def acquire_schedules(self, scheduler_id: str, limit: int) -> list[Schedule]: """ Acquire unclaimed due schedules for processing. @@ -172,7 +172,7 @@ class DataStore(EventSource): """ @abstractmethod - def release_schedules(self, scheduler_id: str, schedules: List[Schedule]) -> None: + def release_schedules(self, scheduler_id: str, schedules: list[Schedule]) -> None: """ Release the claims on the given schedules and update them on the store. @@ -196,7 +196,7 @@ class DataStore(EventSource): """ @abstractmethod - def get_jobs(self, ids: Optional[Iterable[UUID]] = None) -> List[Job]: + def get_jobs(self, ids: Optional[Iterable[UUID]] = None) -> list[Job]: """ Get the list of pending jobs. @@ -205,7 +205,7 @@ class DataStore(EventSource): """ @abstractmethod - def acquire_jobs(self, worker_id: str, limit: Optional[int] = None) -> List[Job]: + def acquire_jobs(self, worker_id: str, limit: Optional[int] = None) -> list[Job]: """ Acquire unclaimed jobs for execution. @@ -277,7 +277,7 @@ class AsyncDataStore(EventSource): """ @abstractmethod - async def get_tasks(self) -> List[Task]: + async def get_tasks(self) -> list[Task]: """ Get all the tasks in this store. @@ -285,7 +285,7 @@ class AsyncDataStore(EventSource): """ @abstractmethod - async def get_schedules(self, ids: Optional[Set[str]] = None) -> List[Schedule]: + async def get_schedules(self, ids: Optional[set[str]] = None) -> list[Schedule]: """ Get schedules from the data store. @@ -312,7 +312,7 @@ class AsyncDataStore(EventSource): """ @abstractmethod - async def acquire_schedules(self, scheduler_id: str, limit: int) -> List[Schedule]: + async def acquire_schedules(self, scheduler_id: str, limit: int) -> list[Schedule]: """ Acquire unclaimed due schedules for processing. @@ -325,7 +325,7 @@ class AsyncDataStore(EventSource): """ @abstractmethod - async def release_schedules(self, scheduler_id: str, schedules: List[Schedule]) -> None: + async def release_schedules(self, scheduler_id: str, schedules: list[Schedule]) -> None: """ Release the claims on the given schedules and update them on the store. @@ -349,7 +349,7 @@ class AsyncDataStore(EventSource): """ @abstractmethod - async def get_jobs(self, ids: Optional[Iterable[UUID]] = None) -> List[Job]: + async def get_jobs(self, ids: Optional[Iterable[UUID]] = None) -> list[Job]: """ Get the list of pending jobs. @@ -358,7 +358,7 @@ class AsyncDataStore(EventSource): """ @abstractmethod - async def acquire_jobs(self, worker_id: str, limit: Optional[int] = None) -> List[Job]: + async def acquire_jobs(self, worker_id: str, limit: Optional[int] = None) -> list[Job]: """ Acquire unclaimed jobs for execution. diff --git a/src/apscheduler/datastores/async_/sqlalchemy.py b/src/apscheduler/datastores/async_/sqlalchemy.py index 8495831..3e573b1 100644 --- a/src/apscheduler/datastores/async_/sqlalchemy.py +++ b/src/apscheduler/datastores/async_/sqlalchemy.py @@ -6,7 +6,7 @@ from collections import defaultdict from contextlib import AsyncExitStack, closing from datetime import datetime, timedelta, timezone from json import JSONDecodeError -from typing import Any, Callable, Dict, Iterable, List, Optional, Set, Tuple, Type, Union +from typing import Any, Callable, Iterable, Optional, Tuple, Type from uuid import UUID import sniffio @@ -48,7 +48,7 @@ def default_json_handler(obj: Any) -> Any: raise TypeError(f'Cannot JSON encode type {type(obj)}') -def json_object_hook(obj: Dict[str, Any]) -> Any: +def json_object_hook(obj: dict[str, Any]) -> Any: for key, value in obj.items(): if key == 'timestamp': obj[key] = datetime.fromtimestamp(value, timezone.utc) @@ -101,7 +101,7 @@ class SQLAlchemyDataStore(AsyncDataStore): self.notify_channel = None @classmethod - def from_url(cls, url: Union[str, URL], **options) -> 'SQLAlchemyDataStore': + def from_url(cls, url: str | URL, **options) -> 'SQLAlchemyDataStore': engine = create_async_engine(url, future=True) return cls(engine, **options) @@ -238,8 +238,8 @@ class SQLAlchemyDataStore(AsyncDataStore): finally: await asyncpg_conn.remove_listener(self.notify_channel, callback) - def _deserialize_jobs(self, serialized_jobs: Iterable[Tuple[UUID, bytes]]) -> List[Job]: - jobs: List[Job] = [] + def _deserialize_jobs(self, serialized_jobs: Iterable[Tuple[UUID, bytes]]) -> list[Job]: + jobs: list[Job] = [] for job_id, serialized_data in serialized_jobs: try: jobs.append(self.serializer.deserialize(serialized_data)) @@ -249,8 +249,8 @@ class SQLAlchemyDataStore(AsyncDataStore): return jobs def _deserialize_schedules( - self, serialized_schedules: Iterable[Tuple[str, bytes]]) -> List[Schedule]: - jobs: List[Schedule] = [] + self, serialized_schedules: Iterable[Tuple[str, bytes]]) -> list[Schedule]: + jobs: list[Schedule] = [] for schedule_id, serialized_data in serialized_schedules: try: jobs.append(self.serializer.deserialize(serialized_data)) @@ -307,7 +307,7 @@ class SQLAlchemyDataStore(AsyncDataStore): else: raise TaskLookupError - async def get_tasks(self) -> List[Task]: + async def get_tasks(self) -> list[Task]: query = select([self.t_tasks.c.id, self.t_tasks.c.func, self.t_tasks.c.max_running_jobs, self.t_tasks.c.state, self.t_tasks.c.misfire_grace_time]).\ order_by(self.t_tasks.c.id) @@ -357,7 +357,7 @@ class SQLAlchemyDataStore(AsyncDataStore): for schedule_id in removed_ids: await self._publish(conn, ScheduleRemoved(schedule_id=schedule_id)) - async def get_schedules(self, ids: Optional[Set[str]] = None) -> List[Schedule]: + async def get_schedules(self, ids: Optional[set[str]] = None) -> list[Schedule]: query = select([self.t_schedules.c.id, self.t_schedules.c.serialized_data]).\ order_by(self.t_schedules.c.id) if ids: @@ -367,7 +367,7 @@ class SQLAlchemyDataStore(AsyncDataStore): result = await conn.execute(query) return self._deserialize_schedules(result) - async def acquire_schedules(self, scheduler_id: str, limit: int) -> List[Schedule]: + async def acquire_schedules(self, scheduler_id: str, limit: int) -> list[Schedule]: async with self.engine.begin() as conn: now = datetime.now(timezone.utc) acquired_until = now + timedelta(seconds=self.lock_expiration_delay) @@ -396,11 +396,11 @@ class SQLAlchemyDataStore(AsyncDataStore): return schedules - async def release_schedules(self, scheduler_id: str, schedules: List[Schedule]) -> None: + async def release_schedules(self, scheduler_id: str, schedules: list[Schedule]) -> None: async with self.engine.begin() as conn: - update_events: List[ScheduleUpdated] = [] - finished_schedule_ids: List[str] = [] - update_args: List[Dict[str, Any]] = [] + update_events: list[ScheduleUpdated] = [] + finished_schedule_ids: list[str] = [] + update_args: list[dict[str, Any]] = [] for schedule in schedules: if schedule.next_fire_time is not None: try: @@ -476,7 +476,7 @@ class SQLAlchemyDataStore(AsyncDataStore): tags=job.tags) await self._publish(conn, event) - async def get_jobs(self, ids: Optional[Iterable[UUID]] = None) -> List[Job]: + async def get_jobs(self, ids: Optional[Iterable[UUID]] = None) -> list[Job]: query = select([self.t_jobs.c.id, self.t_jobs.c.serialized_data]).\ order_by(self.t_jobs.c.id) if ids: @@ -488,7 +488,7 @@ class SQLAlchemyDataStore(AsyncDataStore): return self._deserialize_jobs(result) - async def acquire_jobs(self, worker_id: str, limit: Optional[int] = None) -> List[Job]: + async def acquire_jobs(self, worker_id: str, limit: Optional[int] = None) -> list[Job]: async with self.engine.begin() as conn: now = datetime.now(timezone.utc) acquired_until = now + timedelta(seconds=self.lock_expiration_delay) @@ -505,7 +505,7 @@ class SQLAlchemyDataStore(AsyncDataStore): # Mark the jobs as acquired by this worker jobs = self._deserialize_jobs(result) - task_ids: Set[str] = {job.task_id for job in jobs} + task_ids: set[str] = {job.task_id for job in jobs} # Retrieve the limits query = select([self.t_tasks.c.id, @@ -516,8 +516,8 @@ class SQLAlchemyDataStore(AsyncDataStore): job_slots_left = dict(result.fetchall()) # Filter out jobs that don't have free slots - acquired_jobs: List[Job] = [] - increments: Dict[str, int] = defaultdict(lambda: 0) + acquired_jobs: list[Job] = [] + increments: dict[str, int] = defaultdict(lambda: 0) for job in jobs: # Don't acquire the job if there are no free slots left slots_left = job_slots_left.get(job.task_id) diff --git a/src/apscheduler/datastores/async_/sync_adapter.py b/src/apscheduler/datastores/async_/sync_adapter.py index aef79c7..fb384a1 100644 --- a/src/apscheduler/datastores/async_/sync_adapter.py +++ b/src/apscheduler/datastores/async_/sync_adapter.py @@ -2,7 +2,7 @@ from __future__ import annotations from datetime import datetime from functools import partial -from typing import Any, Callable, Iterable, List, Optional, Set, Type +from typing import Any, Callable, Iterable, Optional, Type from uuid import UUID import attr @@ -42,10 +42,10 @@ class AsyncDataStoreAdapter(AsyncDataStore): async def get_task(self, task_id: str) -> Task: return await to_thread.run_sync(self.original.get_task, task_id) - async def get_tasks(self) -> List[Task]: + async def get_tasks(self) -> list[Task]: return await to_thread.run_sync(self.original.get_tasks) - async def get_schedules(self, ids: Optional[Set[str]] = None) -> List[Schedule]: + async def get_schedules(self, ids: Optional[set[str]] = None) -> list[Schedule]: return await to_thread.run_sync(self.original.get_schedules, ids) async def add_schedule(self, schedule: Schedule, conflict_policy: ConflictPolicy) -> None: @@ -54,10 +54,10 @@ class AsyncDataStoreAdapter(AsyncDataStore): async def remove_schedules(self, ids: Iterable[str]) -> None: await to_thread.run_sync(self.original.remove_schedules, ids) - async def acquire_schedules(self, scheduler_id: str, limit: int) -> List[Schedule]: + async def acquire_schedules(self, scheduler_id: str, limit: int) -> list[Schedule]: return await to_thread.run_sync(self.original.acquire_schedules, scheduler_id, limit) - async def release_schedules(self, scheduler_id: str, schedules: List[Schedule]) -> None: + async def release_schedules(self, scheduler_id: str, schedules: list[Schedule]) -> None: await to_thread.run_sync(self.original.release_schedules, scheduler_id, schedules) async def get_next_schedule_run_time(self) -> Optional[datetime]: @@ -66,10 +66,10 @@ class AsyncDataStoreAdapter(AsyncDataStore): async def add_job(self, job: Job) -> None: await to_thread.run_sync(self.original.add_job, job) - async def get_jobs(self, ids: Optional[Iterable[UUID]] = None) -> List[Job]: + async def get_jobs(self, ids: Optional[Iterable[UUID]] = None) -> list[Job]: return await to_thread.run_sync(self.original.get_jobs, ids) - async def acquire_jobs(self, worker_id: str, limit: Optional[int] = None) -> List[Job]: + async def acquire_jobs(self, worker_id: str, limit: Optional[int] = None) -> list[Job]: return await to_thread.run_sync(self.original.acquire_jobs, worker_id, limit) async def release_job(self, worker_id: str, job: Job, result: Optional[JobResult]) -> None: diff --git a/src/apscheduler/datastores/sync/memory.py b/src/apscheduler/datastores/sync/memory.py index 2e93433..16c421b 100644 --- a/src/apscheduler/datastores/sync/memory.py +++ b/src/apscheduler/datastores/sync/memory.py @@ -4,7 +4,7 @@ from bisect import bisect_left, insort_right from collections import defaultdict from datetime import MAXYEAR, datetime, timedelta, timezone from functools import partial -from typing import Any, Callable, Dict, Iterable, List, Optional, Set, Type +from typing import Any, Callable, Iterable, Optional, Type from uuid import UUID import attr @@ -76,14 +76,14 @@ class JobState: class MemoryDataStore(DataStore): lock_expiration_delay: float = 30 _events: EventHub = attr.Factory(EventHub) - _tasks: Dict[str, TaskState] = attr.Factory(dict) - _schedules: List[ScheduleState] = attr.Factory(list) - _schedules_by_id: Dict[str, ScheduleState] = attr.Factory(dict) - _schedules_by_task_id: Dict[str, Set[ScheduleState]] = attr.Factory(partial(defaultdict, set)) - _jobs: List[JobState] = attr.Factory(list) - _jobs_by_id: Dict[UUID, JobState] = attr.Factory(dict) - _jobs_by_task_id: Dict[str, Set[JobState]] = attr.Factory(partial(defaultdict, set)) - _job_results: Dict[UUID, JobResult] = attr.Factory(dict) + _tasks: dict[str, TaskState] = attr.Factory(dict) + _schedules: list[ScheduleState] = attr.Factory(list) + _schedules_by_id: dict[str, ScheduleState] = attr.Factory(dict) + _schedules_by_task_id: dict[str, set[ScheduleState]] = attr.Factory(partial(defaultdict, set)) + _jobs: list[JobState] = attr.Factory(list) + _jobs_by_id: dict[UUID, JobState] = attr.Factory(dict) + _jobs_by_task_id: dict[str, set[JobState]] = attr.Factory(partial(defaultdict, set)) + _job_results: dict[UUID, JobResult] = attr.Factory(dict) def _find_schedule_index(self, state: ScheduleState) -> Optional[int]: left_index = bisect_left(self._schedules, state) @@ -109,7 +109,7 @@ class MemoryDataStore(DataStore): def unsubscribe(self, token: events.SubscriptionToken) -> None: self._events.unsubscribe(token) - def get_schedules(self, ids: Optional[Set[str]] = None) -> List[Schedule]: + def get_schedules(self, ids: Optional[set[str]] = None) -> list[Schedule]: return [state.schedule for state in self._schedules if ids is None or state.schedule.id in ids] @@ -131,7 +131,7 @@ class MemoryDataStore(DataStore): except KeyError: raise TaskLookupError(task_id) from None - def get_tasks(self) -> List[Task]: + def get_tasks(self) -> list[Task]: return sorted((state.task for state in self._tasks.values()), key=lambda task: task.id) def add_schedule(self, schedule: Schedule, conflict_policy: ConflictPolicy) -> None: @@ -168,9 +168,9 @@ class MemoryDataStore(DataStore): event = ScheduleRemoved(schedule_id=state.schedule.id) self._events.publish(event) - def acquire_schedules(self, scheduler_id: str, limit: int) -> List[Schedule]: + def acquire_schedules(self, scheduler_id: str, limit: int) -> list[Schedule]: now = datetime.now(timezone.utc) - schedules: List[Schedule] = [] + schedules: list[Schedule] = [] for state in self._schedules: if state.next_fire_time is None or state.next_fire_time > now: # The schedule is either paused or not yet due @@ -189,9 +189,9 @@ class MemoryDataStore(DataStore): return schedules - def release_schedules(self, scheduler_id: str, schedules: List[Schedule]) -> None: + def release_schedules(self, scheduler_id: str, schedules: list[Schedule]) -> None: # Send update events for schedules that have a next time - finished_schedule_ids: List[str] = [] + finished_schedule_ids: list[str] = [] for s in schedules: if s.next_fire_time is not None: # Remove the schedule @@ -225,15 +225,15 @@ class MemoryDataStore(DataStore): tags=job.tags) self._events.publish(event) - def get_jobs(self, ids: Optional[Iterable[UUID]] = None) -> List[Job]: + def get_jobs(self, ids: Optional[Iterable[UUID]] = None) -> list[Job]: if ids is not None: ids = frozenset(ids) return [state.job for state in self._jobs if ids is None or state.job.id in ids] - def acquire_jobs(self, worker_id: str, limit: Optional[int] = None) -> List[Job]: + def acquire_jobs(self, worker_id: str, limit: Optional[int] = None) -> list[Job]: now = datetime.now(timezone.utc) - jobs: List[Job] = [] + jobs: list[Job] = [] for index, job_state in enumerate(self._jobs): task_state = self._tasks[job_state.job.task_id] diff --git a/src/apscheduler/datastores/sync/mongodb.py b/src/apscheduler/datastores/sync/mongodb.py index 3d963ac..2d25dbf 100644 --- a/src/apscheduler/datastores/sync/mongodb.py +++ b/src/apscheduler/datastores/sync/mongodb.py @@ -4,7 +4,7 @@ import logging from collections import defaultdict from contextlib import ExitStack from datetime import datetime, timezone -from typing import Any, Callable, ClassVar, Dict, Iterable, List, Optional, Set, Tuple, Type +from typing import Any, Callable, ClassVar, Iterable, Optional, Tuple, Type from uuid import UUID import attr @@ -28,9 +28,9 @@ from ...util import reentrant @reentrant class MongoDBDataStore(DataStore): - _task_attrs: ClassVar[List[str]] = [field.name for field in attr.fields(Task)] - _schedule_attrs: ClassVar[List[str]] = [field.name for field in attr.fields(Schedule)] - _job_attrs: ClassVar[List[str]] = [field.name for field in attr.fields(Job)] + _task_attrs: ClassVar[list[str]] = [field.name for field in attr.fields(Task)] + _schedule_attrs: ClassVar[list[str]] = [field.name for field in attr.fields(Schedule)] + _job_attrs: ClassVar[list[str]] = [field.name for field in attr.fields(Job)] def __init__(self, client: MongoClient, *, serializer: Optional[Serializer] = None, database: str = 'apscheduler', tasks_collection: str = 'tasks', @@ -45,7 +45,7 @@ class MongoDBDataStore(DataStore): self.serializer = serializer or PickleSerializer() self.lock_expiration_delay = lock_expiration_delay self.start_from_scratch = start_from_scratch - self._local_tasks: Dict[str, Task] = {} + self._local_tasks: dict[str, Task] = {} self._database = client[database] self._tasks: Collection = self._database[tasks_collection] self._schedules: Collection = self._database[schedules_collection] @@ -121,8 +121,8 @@ class MongoDBDataStore(DataStore): task = self._local_tasks[task_id] = Task.unmarshal(self.serializer, document) return task - def get_tasks(self) -> List[Task]: - tasks: List[Task] = [] + def get_tasks(self) -> list[Task]: + tasks: list[Task] = [] for document in self._tasks.find(projection=self._task_attrs, sort=[('_id', pymongo.ASCENDING)]): document['id'] = document.pop('_id') @@ -130,8 +130,8 @@ class MongoDBDataStore(DataStore): return tasks - def get_schedules(self, ids: Optional[Set[str]] = None) -> List[Schedule]: - schedules: List[Schedule] = [] + def get_schedules(self, ids: Optional[set[str]] = None) -> list[Schedule]: + schedules: list[Schedule] = [] filters = {'_id': {'$in': list(ids)}} if ids is not None else {} cursor = self._schedules.find(filters, projection=['_id', 'serialized_data']).sort('_id') for document in cursor: @@ -181,8 +181,8 @@ class MongoDBDataStore(DataStore): for schedule_id in ids: self._events.publish(ScheduleRemoved(schedule_id=schedule_id)) - def acquire_schedules(self, scheduler_id: str, limit: int) -> List[Schedule]: - schedules: List[Schedule] = [] + def acquire_schedules(self, scheduler_id: str, limit: int) -> list[Schedule]: + schedules: list[Schedule] = [] with self.client.start_session() as s, s.start_transaction(): cursor = self._schedules.find( {'next_fire_time': {'$ne': None}, @@ -206,9 +206,9 @@ class MongoDBDataStore(DataStore): return schedules - def release_schedules(self, scheduler_id: str, schedules: List[Schedule]) -> None: - updated_schedules: List[Tuple[str, datetime]] = [] - finished_schedule_ids: List[str] = [] + def release_schedules(self, scheduler_id: str, schedules: list[Schedule]) -> None: + updated_schedules: list[Tuple[str, datetime]] = [] + finished_schedule_ids: list[str] = [] with self.client.start_session() as s, s.start_transaction(): # Update schedules that have a next fire time requests = [] @@ -272,8 +272,8 @@ class MongoDBDataStore(DataStore): tags=job.tags) self._events.publish(event) - def get_jobs(self, ids: Optional[Iterable[UUID]] = None) -> List[Job]: - jobs: List[Job] = [] + def get_jobs(self, ids: Optional[Iterable[UUID]] = None) -> list[Job]: + jobs: list[Job] = [] filters = {'_id': {'$in': list(ids)}} if ids is not None else {} cursor = self._jobs.find(filters, projection=['_id', 'serialized_data']).sort('_id') for document in cursor: @@ -287,7 +287,7 @@ class MongoDBDataStore(DataStore): return jobs - def acquire_jobs(self, worker_id: str, limit: Optional[int] = None) -> List[Job]: + def acquire_jobs(self, worker_id: str, limit: Optional[int] = None) -> list[Job]: with self.client.start_session() as session: cursor = self._jobs.find( {'$or': [{'acquired_until': {'$exists': False}}, @@ -301,7 +301,7 @@ class MongoDBDataStore(DataStore): documents = list(cursor) # Retrieve the limits - task_ids: Set[str] = {document['task_id'] for document in documents} + task_ids: set[str] = {document['task_id'] for document in documents} task_limits = self._tasks.find( {'_id': {'$in': list(task_ids)}, 'max_running_jobs': {'$ne': None}}, projection=['max_running_jobs', 'running_jobs'], @@ -311,8 +311,8 @@ class MongoDBDataStore(DataStore): for doc in task_limits} # Filter out jobs that don't have free slots - acquired_jobs: List[Job] = [] - increments: Dict[str, int] = defaultdict(lambda: 0) + acquired_jobs: list[Job] = [] + increments: dict[str, int] = defaultdict(lambda: 0) for document in documents: job = self.serializer.deserialize(document['serialized_data']) diff --git a/src/apscheduler/datastores/sync/sqlalchemy.py b/src/apscheduler/datastores/sync/sqlalchemy.py index 07118ba..07a7628 100644 --- a/src/apscheduler/datastores/sync/sqlalchemy.py +++ b/src/apscheduler/datastores/sync/sqlalchemy.py @@ -3,7 +3,7 @@ from __future__ import annotations import logging from collections import defaultdict from datetime import datetime, timedelta, timezone -from typing import Any, Callable, Dict, Iterable, List, Optional, Set, Tuple, Type, Union +from typing import Any, Callable, Iterable, Optional, Tuple, Type from uuid import UUID from sqlalchemy import ( @@ -72,7 +72,7 @@ class SQLAlchemyDataStore(DataStore): self._supports_update_returning = True @classmethod - def from_url(cls, url: Union[str, URL], **options) -> 'SQLAlchemyDataStore': + def from_url(cls, url: str | URL, **options) -> 'SQLAlchemyDataStore': engine = create_engine(url) return cls(engine, **options) @@ -151,8 +151,8 @@ class SQLAlchemyDataStore(DataStore): ) return metadata - def _deserialize_jobs(self, serialized_jobs: Iterable[Tuple[UUID, bytes]]) -> List[Job]: - jobs: List[Job] = [] + def _deserialize_jobs(self, serialized_jobs: Iterable[Tuple[UUID, bytes]]) -> list[Job]: + jobs: list[Job] = [] for job_id, serialized_data in serialized_jobs: try: jobs.append(self.serializer.deserialize(serialized_data)) @@ -162,8 +162,8 @@ class SQLAlchemyDataStore(DataStore): return jobs def _deserialize_schedules( - self, serialized_schedules: Iterable[Tuple[str, bytes]]) -> List[Schedule]: - jobs: List[Schedule] = [] + self, serialized_schedules: Iterable[Tuple[str, bytes]]) -> list[Schedule]: + jobs: list[Schedule] = [] for schedule_id, serialized_data in serialized_schedules: try: jobs.append(self.serializer.deserialize(serialized_data)) @@ -220,7 +220,7 @@ class SQLAlchemyDataStore(DataStore): else: raise TaskLookupError - def get_tasks(self) -> List[Task]: + def get_tasks(self) -> list[Task]: query = select([self.t_tasks.c.id, self.t_tasks.c.func, self.t_tasks.c.max_running_jobs, self.t_tasks.c.state, self.t_tasks.c.misfire_grace_time]).\ order_by(self.t_tasks.c.id) @@ -270,7 +270,7 @@ class SQLAlchemyDataStore(DataStore): for schedule_id in removed_ids: self._events.publish(ScheduleRemoved(schedule_id=schedule_id)) - def get_schedules(self, ids: Optional[Set[str]] = None) -> List[Schedule]: + def get_schedules(self, ids: Optional[set[str]] = None) -> list[Schedule]: query = select([self.t_schedules.c.id, self.t_schedules.c.serialized_data]).\ order_by(self.t_schedules.c.id) if ids: @@ -280,7 +280,7 @@ class SQLAlchemyDataStore(DataStore): result = conn.execute(query) return self._deserialize_schedules(result) - def acquire_schedules(self, scheduler_id: str, limit: int) -> List[Schedule]: + def acquire_schedules(self, scheduler_id: str, limit: int) -> list[Schedule]: with self.engine.begin() as conn: now = datetime.now(timezone.utc) acquired_until = now + timedelta(seconds=self.lock_expiration_delay) @@ -309,11 +309,11 @@ class SQLAlchemyDataStore(DataStore): return schedules - def release_schedules(self, scheduler_id: str, schedules: List[Schedule]) -> None: + def release_schedules(self, scheduler_id: str, schedules: list[Schedule]) -> None: with self.engine.begin() as conn: - update_events: List[ScheduleUpdated] = [] - finished_schedule_ids: List[str] = [] - update_args: List[Dict[str, Any]] = [] + update_events: list[ScheduleUpdated] = [] + finished_schedule_ids: list[str] = [] + update_args: list[dict[str, Any]] = [] for schedule in schedules: if schedule.next_fire_time is not None: try: @@ -389,7 +389,7 @@ class SQLAlchemyDataStore(DataStore): tags=job.tags) self._events.publish(event) - def get_jobs(self, ids: Optional[Iterable[UUID]] = None) -> List[Job]: + def get_jobs(self, ids: Optional[Iterable[UUID]] = None) -> list[Job]: query = select([self.t_jobs.c.id, self.t_jobs.c.serialized_data]).\ order_by(self.t_jobs.c.id) if ids: @@ -400,7 +400,7 @@ class SQLAlchemyDataStore(DataStore): result = conn.execute(query) return self._deserialize_jobs(result) - def acquire_jobs(self, worker_id: str, limit: Optional[int] = None) -> List[Job]: + def acquire_jobs(self, worker_id: str, limit: Optional[int] = None) -> list[Job]: with self.engine.begin() as conn: now = datetime.now(timezone.utc) acquired_until = now + timedelta(seconds=self.lock_expiration_delay) @@ -417,7 +417,7 @@ class SQLAlchemyDataStore(DataStore): # Mark the jobs as acquired by this worker jobs = self._deserialize_jobs(result) - task_ids: Set[str] = {job.task_id for job in jobs} + task_ids: set[str] = {job.task_id for job in jobs} # Retrieve the limits query = select([self.t_tasks.c.id, @@ -428,8 +428,8 @@ class SQLAlchemyDataStore(DataStore): job_slots_left = dict(result.fetchall()) # Filter out jobs that don't have free slots - acquired_jobs: List[Job] = [] - increments: Dict[str, int] = defaultdict(lambda: 0) + acquired_jobs: list[Job] = [] + increments: dict[str, int] = defaultdict(lambda: 0) for job in jobs: # Don't acquire the job if there are no free slots left slots_left = job_slots_left.get(job.task_id) diff --git a/src/apscheduler/events.py b/src/apscheduler/events.py index 6c0d270..47799cb 100644 --- a/src/apscheduler/events.py +++ b/src/apscheduler/events.py @@ -9,7 +9,7 @@ from functools import partial from inspect import isawaitable from logging import Logger from traceback import format_tb -from typing import Any, Callable, Dict, FrozenSet, Iterable, NewType, Optional, Set, Type, Union +from typing import Any, Callable, Iterable, NewType, Optional, Type from uuid import UUID import attr @@ -22,7 +22,7 @@ from .structures import Job SubscriptionToken = NewType('SubscriptionToken', object) -def timestamp_to_datetime(value: Union[datetime, float, None]) -> Optional[datetime]: +def timestamp_to_datetime(value: datetime | float | None) -> Optional[datetime]: if isinstance(value, float): return datetime.fromtimestamp(value, timezone.utc) @@ -76,7 +76,7 @@ class JobAdded(DataStoreEvent): job_id: UUID task_id: str schedule_id: Optional[str] - tags: FrozenSet[str] + tags: frozenset[str] @attr.define(kw_only=True, frozen=True) @@ -221,13 +221,13 @@ class JobFailed(JobExecutionEvent): @attr.define(eq=False, frozen=True) class Subscription: callback: Callable[[Event], Any] - event_types: Optional[Set[Type[Event]]] + event_types: Optional[set[Type[Event]]] @attr.define class _BaseEventHub(abc.EventSource): _logger: Logger = attr.field(init=False, factory=lambda: logging.getLogger(__name__)) - _subscriptions: Dict[SubscriptionToken, Subscription] = attr.field(init=False, factory=dict) + _subscriptions: dict[SubscriptionToken, Subscription] = attr.field(init=False, factory=dict) def subscribe(self, callback: Callable[[Event], Any], event_types: Optional[Iterable[Type[Event]]] = None) -> SubscriptionToken: diff --git a/src/apscheduler/schedulers/async_.py b/src/apscheduler/schedulers/async_.py index 010b3ce..abbe257 100644 --- a/src/apscheduler/schedulers/async_.py +++ b/src/apscheduler/schedulers/async_.py @@ -5,7 +5,7 @@ import platform from contextlib import AsyncExitStack from datetime import datetime, timedelta, timezone from logging import Logger, getLogger -from typing import Any, Callable, Iterable, Mapping, Optional, Type, Union +from typing import Any, Callable, Iterable, Mapping, Optional, Type from uuid import uuid4 import anyio @@ -34,7 +34,7 @@ class AsyncScheduler(EventSource): _worker: Optional[AsyncWorker] = None _task_group: Optional[TaskGroup] = None - def __init__(self, data_store: Union[DataStore, AsyncDataStore] = None, *, + def __init__(self, data_store: DataStore | AsyncDataStore | None = None, *, identity: Optional[str] = None, logger: Optional[Logger] = None, start_worker: bool = True): self.identity = identity or f'{platform.node()}-{os.getpid()}-{id(self)}' @@ -94,7 +94,7 @@ class AsyncScheduler(EventSource): def unsubscribe(self, token: SubscriptionToken) -> None: self._events.unsubscribe(token) - # def _get_taskdef(self, func_or_id: Union[str, Callable]) -> Task: + # def _get_taskdef(self, func_or_id: str | Callable) -> Task: # task_id = func_or_id if isinstance(func_or_id, str) else callable_to_ref(func_or_id) # taskdef = self._tasks.get(task_id) # if not taskdef: @@ -114,11 +114,10 @@ class AsyncScheduler(EventSource): # pass async def add_schedule( - self, func_or_task_id: Union[str, Callable], trigger: Trigger, *, id: Optional[str] = None, + self, func_or_task_id: str | Callable, trigger: Trigger, *, id: Optional[str] = None, args: Optional[Iterable] = None, kwargs: Optional[Mapping[str, Any]] = None, coalesce: CoalescePolicy = CoalescePolicy.latest, - misfire_grace_time: Union[float, timedelta, None] = None, - tags: Optional[Iterable[str]] = None, + misfire_grace_time: float | timedelta | None = None, tags: Optional[Iterable[str]] = None, conflict_policy: ConflictPolicy = ConflictPolicy.do_nothing ) -> str: id = id or str(uuid4()) diff --git a/src/apscheduler/schedulers/sync.py b/src/apscheduler/schedulers/sync.py index b80210d..487b724 100644 --- a/src/apscheduler/schedulers/sync.py +++ b/src/apscheduler/schedulers/sync.py @@ -7,7 +7,7 @@ from concurrent.futures import FIRST_COMPLETED, Future, ThreadPoolExecutor, wait from contextlib import ExitStack from datetime import datetime, timedelta, timezone from logging import Logger, getLogger -from typing import Any, Callable, Iterable, Mapping, Optional, Type, Union +from typing import Any, Callable, Iterable, Mapping, Optional, Type from uuid import uuid4 from ..abc import DataStore, EventSource, Trigger @@ -96,10 +96,10 @@ class Scheduler(EventSource): self._events.unsubscribe(token) def add_schedule( - self, func_or_task_id: Union[str, Callable], trigger: Trigger, *, id: Optional[str] = None, + self, func_or_task_id: str | Callable, trigger: Trigger, *, id: Optional[str] = None, args: Optional[Iterable] = None, kwargs: Optional[Mapping[str, Any]] = None, coalesce: CoalescePolicy = CoalescePolicy.latest, - misfire_grace_time: Union[float, timedelta, None] = None, + misfire_grace_time: float | timedelta | None = None, tags: Optional[Iterable[str]] = None, conflict_policy: ConflictPolicy = ConflictPolicy.do_nothing ) -> str: diff --git a/src/apscheduler/serializers/cbor.py b/src/apscheduler/serializers/cbor.py index 3aefd4d..d80bce1 100644 --- a/src/apscheduler/serializers/cbor.py +++ b/src/apscheduler/serializers/cbor.py @@ -1,4 +1,6 @@ -from typing import Any, Dict +from __future__ import annotations + +from typing import Any import attr from cbor2 import CBOREncodeTypeError, CBORTag, dumps, loads @@ -10,8 +12,8 @@ from ..marshalling import marshal_object, unmarshal_object @attr.define(kw_only=True, eq=False) class CBORSerializer(Serializer): type_tag: int = 4664 - dump_options: Dict[str, Any] = attr.field(factory=dict) - load_options: Dict[str, Any] = attr.field(factory=dict) + dump_options: dict[str, Any] = attr.field(factory=dict) + load_options: dict[str, Any] = attr.field(factory=dict) def __attrs_post_init__(self): self.dump_options.setdefault('default', self._default_hook) diff --git a/src/apscheduler/serializers/json.py b/src/apscheduler/serializers/json.py index 7b64c4f..f7ef307 100644 --- a/src/apscheduler/serializers/json.py +++ b/src/apscheduler/serializers/json.py @@ -1,5 +1,7 @@ +from __future__ import annotations + from json import dumps, loads -from typing import Any, Dict +from typing import Any import attr @@ -10,8 +12,8 @@ from ..marshalling import marshal_object, unmarshal_object @attr.define(kw_only=True, eq=False) class JSONSerializer(Serializer): magic_key: str = '_apscheduler_json' - dump_options: Dict[str, Any] = attr.field(factory=dict) - load_options: Dict[str, Any] = attr.field(factory=dict) + dump_options: dict[str, Any] = attr.field(factory=dict) + load_options: dict[str, Any] = attr.field(factory=dict) def __attrs_post_init__(self): self.dump_options['default'] = self._default_hook @@ -24,7 +26,7 @@ class JSONSerializer(Serializer): raise TypeError(f'Object of type {obj.__class__.__name__!r} is not JSON serializable') - def _object_hook(self, obj_state: Dict[str, Any]): + def _object_hook(self, obj_state: dict[str, Any]): if self.magic_key in obj_state: ref, *rest = obj_state[self.magic_key] return unmarshal_object(ref, *rest) diff --git a/src/apscheduler/structures.py b/src/apscheduler/structures.py index eeab5ac..a25d411 100644 --- a/src/apscheduler/structures.py +++ b/src/apscheduler/structures.py @@ -1,7 +1,7 @@ from __future__ import annotations from datetime import datetime, timedelta -from typing import Any, Callable, Dict, FrozenSet, Optional +from typing import Any, Callable, Optional from uuid import UUID, uuid4 import attr @@ -19,14 +19,14 @@ class Task: state: Any = None misfire_grace_time: Optional[timedelta] = attr.field(eq=False, order=False, default=None) - def marshal(self, serializer: abc.Serializer) -> Dict[str, Any]: + def marshal(self, serializer: abc.Serializer) -> dict[str, Any]: marshalled = attr.asdict(self) marshalled['func'] = callable_to_ref(self.func) marshalled['state'] = serializer.serialize(self.state) if self.state else None return marshalled @classmethod - def unmarshal(cls, serializer: abc.Serializer, marshalled: Dict[str, Any]) -> Task: + def unmarshal(cls, serializer: abc.Serializer, marshalled: dict[str, Any]) -> Task: marshalled['func'] = callable_from_ref(marshalled['func']) if marshalled['state'] is not None: marshalled['state'] = serializer.deserialize(marshalled['state']) @@ -40,16 +40,16 @@ class Schedule: task_id: str = attr.field(eq=False, order=False) trigger: abc.Trigger = attr.field(eq=False, order=False) args: tuple = attr.field(eq=False, order=False, default=()) - kwargs: Dict[str, Any] = attr.field(eq=False, order=False, factory=dict) + kwargs: dict[str, Any] = attr.field(eq=False, order=False, factory=dict) coalesce: CoalescePolicy = attr.field(eq=False, order=False, default=CoalescePolicy.latest) misfire_grace_time: Optional[timedelta] = attr.field(eq=False, order=False, default=None) - tags: FrozenSet[str] = attr.field(eq=False, order=False, factory=frozenset) + tags: frozenset[str] = attr.field(eq=False, order=False, factory=frozenset) next_fire_time: Optional[datetime] = attr.field(eq=False, order=False, init=False, default=None) last_fire_time: Optional[datetime] = attr.field(eq=False, order=False, init=False, default=None) - def marshal(self, serializer: abc.Serializer) -> Dict[str, Any]: + def marshal(self, serializer: abc.Serializer) -> dict[str, Any]: marshalled = attr.asdict(self) marshalled['trigger_type'] = serializer.serialize(self.args) marshalled['trigger_data'] = serializer.serialize(self.trigger) @@ -71,14 +71,14 @@ class Job: id: UUID = attr.field(factory=uuid4) task_id: str = attr.field(eq=False, order=False) args: tuple = attr.field(eq=False, order=False, default=()) - kwargs: Dict[str, Any] = attr.field(eq=False, order=False, factory=dict) + kwargs: dict[str, Any] = attr.field(eq=False, order=False, factory=dict) schedule_id: Optional[str] = attr.field(eq=False, order=False, default=None) scheduled_fire_time: Optional[datetime] = attr.field(eq=False, order=False, default=None) start_deadline: Optional[datetime] = attr.field(eq=False, order=False, default=None) - tags: FrozenSet[str] = attr.field(eq=False, order=False, factory=frozenset) + tags: frozenset[str] = attr.field(eq=False, order=False, factory=frozenset) started_at: Optional[datetime] = attr.field(eq=False, order=False, init=False, default=None) - def marshal(self, serializer: abc.Serializer) -> Dict[str, Any]: + def marshal(self, serializer: abc.Serializer) -> dict[str, Any]: marshalled = attr.asdict(self) marshalled['args'] = serializer.serialize(self.args) if self.args else None marshalled['kwargs'] = serializer.serialize(self.kwargs) if self.kwargs else None @@ -86,7 +86,7 @@ class Job: return marshalled @classmethod - def unmarshal(cls, serializer: abc.Serializer, marshalled: Dict[str, Any]) -> Task: + def unmarshal(cls, serializer: abc.Serializer, marshalled: dict[str, Any]) -> Task: for key in ('args', 'kwargs'): if marshalled[key] is not None: marshalled[key] = serializer.deserialize(marshalled[key]) diff --git a/src/apscheduler/triggers/calendarinterval.py b/src/apscheduler/triggers/calendarinterval.py index c6d63da..fe4e9f6 100644 --- a/src/apscheduler/triggers/calendarinterval.py +++ b/src/apscheduler/triggers/calendarinterval.py @@ -1,5 +1,7 @@ +from __future__ import annotations + from datetime import date, datetime, time, timedelta, tzinfo -from typing import Optional, Union +from typing import Optional from ..abc import Trigger from ..marshalling import marshal_date, marshal_timezone, unmarshal_date, unmarshal_timezone @@ -56,9 +58,9 @@ class CalendarIntervalTrigger(Trigger): def __init__(self, *, years: int = 0, months: int = 0, weeks: int = 0, days: int = 0, hour: int = 0, minute: int = 0, second: int = 0, - start_date: Union[date, str, None] = None, - end_date: Union[date, str, None] = None, - timezone: Union[str, tzinfo] = 'local'): + start_date: date | str | None = None, + end_date: date | str | None = None, + timezone: str | tzinfo = 'local'): self.years = years self.months = months self.weeks = weeks diff --git a/src/apscheduler/triggers/combining.py b/src/apscheduler/triggers/combining.py index 04363fd..2bcaa7e 100644 --- a/src/apscheduler/triggers/combining.py +++ b/src/apscheduler/triggers/combining.py @@ -1,6 +1,8 @@ +from __future__ import annotations + from abc import abstractmethod from datetime import datetime, timedelta -from typing import Any, Dict, List, Optional, Sequence, Union +from typing import Any, Optional, Sequence from ..abc import Trigger from ..exceptions import MaxIterationsReached @@ -13,9 +15,9 @@ class BaseCombiningTrigger(Trigger): def __init__(self, triggers: Sequence[Trigger]): self.triggers = as_list(triggers, Trigger, 'triggers') - self._next_fire_times: List[Optional[datetime]] = [] + self._next_fire_times: list[Optional[datetime]] = [] - def __getstate__(self) -> Dict[str, Any]: + def __getstate__(self) -> dict[str, Any]: return { 'version': 1, 'triggers': [marshal_object(trigger) for trigger in self.triggers], @@ -23,7 +25,7 @@ class BaseCombiningTrigger(Trigger): } @abstractmethod - def __setstate__(self, state: Dict[str, Any]) -> None: + def __setstate__(self, state: dict[str, Any]) -> None: self.triggers = [unmarshal_object(*trigger_state) for trigger_state in state['triggers']] self._next_fire_times = state['next_fire_times'] @@ -49,7 +51,7 @@ class AndTrigger(BaseCombiningTrigger): __slots__ = 'threshold', 'max_iterations' - def __init__(self, triggers: Sequence[Trigger], *, threshold: Union[float, timedelta] = 1, + def __init__(self, triggers: Sequence[Trigger], *, threshold: float | timedelta = 1, max_iterations: Optional[int] = 10000): super().__init__(triggers) self.threshold = as_timedelta(threshold, 'threshold') @@ -87,13 +89,13 @@ class AndTrigger(BaseCombiningTrigger): else: raise MaxIterationsReached - def __getstate__(self) -> Dict[str, Any]: + def __getstate__(self) -> dict[str, Any]: state = super().__getstate__() state['threshold'] = self.threshold.total_seconds() state['max_iterations'] = self.max_iterations return state - def __setstate__(self, state: Dict[str, Any]) -> None: + def __setstate__(self, state: dict[str, Any]) -> None: require_state_version(self, state, 1) super().__setstate__(state) self.threshold = timedelta(seconds=state['threshold']) @@ -133,7 +135,7 @@ class OrTrigger(BaseCombiningTrigger): return earliest_time - def __setstate__(self, state: Dict[str, Any]) -> None: + def __setstate__(self, state: dict[str, Any]) -> None: require_state_version(self, state, 1) super().__setstate__(state) diff --git a/src/apscheduler/triggers/cron/__init__.py b/src/apscheduler/triggers/cron/__init__.py index 682cd04..89ac676 100644 --- a/src/apscheduler/triggers/cron/__init__.py +++ b/src/apscheduler/triggers/cron/__init__.py @@ -1,5 +1,7 @@ +from __future__ import annotations + from datetime import datetime, timedelta, tzinfo -from typing import ClassVar, List, Optional, Sequence, Tuple, Union +from typing import ClassVar, List, Optional, Sequence, Tuple from ...abc import Trigger from ...marshalling import marshal_date, marshal_timezone, unmarshal_date, unmarshal_timezone @@ -43,13 +45,13 @@ class CronTrigger(Trigger): ('second', BaseField) ] - def __init__(self, *, year: Union[int, str, None] = None, month: Union[int, str, None] = None, - day: Union[int, str, None] = None, week: Union[int, str, None] = None, - day_of_week: Union[int, str, None] = None, hour: Union[int, str, None] = None, - minute: Union[int, str, None] = None, second: Union[int, str, None] = None, - start_time: Union[datetime, str, None] = None, - end_time: Union[datetime, str, None] = None, - timezone: Union[str, tzinfo, None] = None): + def __init__(self, *, year: int | str | None = None, month: int | str | None = None, + day: int | str | None = None, week: int | str | None = None, + day_of_week: int | str | None = None, hour: int | str | None = None, + minute: int | str | None = None, second: int | str | None = None, + start_time: datetime | str | None = None, + end_time: datetime | str | None = None, + timezone: str | tzinfo | None = None): self.timezone = as_timezone(timezone) self.start_time = (as_aware_datetime(start_time, self.timezone) or datetime.now(self.timezone)) @@ -57,7 +59,7 @@ class CronTrigger(Trigger): self._set_fields([year, month, day, week, day_of_week, hour, minute, second]) self._last_fire_time: Optional[datetime] = None - def _set_fields(self, values: Sequence[Union[int, str, None]]) -> None: + def _set_fields(self, values: Sequence[int | str | None]) -> None: self._fields = [] assigned_values = {field_name: value for (field_name, _), value in zip(self.FIELDS_MAP, values) @@ -71,7 +73,7 @@ class CronTrigger(Trigger): self._fields.append(field) @classmethod - def from_crontab(cls, expr: str, timezone: Union[str, tzinfo] = 'local') -> 'CronTrigger': + def from_crontab(cls, expr: str, timezone: str | tzinfo = 'local') -> 'CronTrigger': """ Create a :class:`~CronTrigger` from a standard crontab expression. diff --git a/src/apscheduler/triggers/cron/expressions.py b/src/apscheduler/triggers/cron/expressions.py index b2d098a..e417dea 100644 --- a/src/apscheduler/triggers/cron/expressions.py +++ b/src/apscheduler/triggers/cron/expressions.py @@ -1,9 +1,10 @@ """This module contains the expressions applicable for CronTrigger's fields.""" +from __future__ import annotations import re from calendar import monthrange from datetime import datetime -from typing import Optional, Union +from typing import Optional from ...validators import as_int @@ -23,7 +24,7 @@ class AllExpression: value_re = re.compile(r'\*(?:/(?P<step>\d+))?$') - def __init__(self, step: Union[str, int, None] = None): + def __init__(self, step: str | int | None = None): self.step = as_int(step) if self.step == 0: raise ValueError('Step must be higher than 0') @@ -57,8 +58,8 @@ class RangeExpression(AllExpression): value_re = re.compile(r'(?P<first>\d+)(?:-(?P<last>\d+))?(?:/(?P<step>\d+))?$') - def __init__(self, first: Union[str, int], last: Union[str, int, None] = None, - step: Union[str, int, None] = None): + def __init__(self, first: str | int, last: str | int | None = None, + step: str | int | None = None): super().__init__(step) self.first = as_int(first) self.last = as_int(last) diff --git a/src/apscheduler/triggers/cron/fields.py b/src/apscheduler/triggers/cron/fields.py index 1e19bb9..d15fcd5 100644 --- a/src/apscheduler/triggers/cron/fields.py +++ b/src/apscheduler/triggers/cron/fields.py @@ -1,9 +1,10 @@ """Fields represent CronTrigger options which map to :class:`~datetime.datetime` fields.""" +from __future__ import annotations import re from calendar import monthrange from datetime import datetime -from typing import Any, ClassVar, List, Optional, Sequence, Union +from typing import Any, ClassVar, List, Optional, Sequence from .expressions import ( WEEKDAYS, AllExpression, LastDayOfMonthExpression, MonthRangeExpression, RangeExpression, @@ -29,7 +30,7 @@ class BaseField: if extra_compilers: cls.compilers += extra_compilers - def __init__(self, name: str, exprs: Union[int, str]): + def __init__(self, name: str, exprs: int | str): self.name = name self.expressions: List = [] for expr in SEPARATOR.split(str(exprs).strip()): diff --git a/src/apscheduler/triggers/date.py b/src/apscheduler/triggers/date.py index 636d978..c142ca7 100644 --- a/src/apscheduler/triggers/date.py +++ b/src/apscheduler/triggers/date.py @@ -1,5 +1,7 @@ +from __future__ import annotations + from datetime import datetime, tzinfo -from typing import Optional, Union +from typing import Optional from ..abc import Trigger from ..marshalling import marshal_date, unmarshal_date @@ -17,7 +19,7 @@ class DateTrigger(Trigger): __slots__ = 'run_time', '_completed' - def __init__(self, run_time: datetime, timezone: Union[tzinfo, str] = 'local'): + def __init__(self, run_time: datetime, timezone: tzinfo | str = 'local'): timezone = as_timezone(timezone) self.run_time = as_aware_datetime(run_time, timezone) self._completed = False diff --git a/src/apscheduler/triggers/interval.py b/src/apscheduler/triggers/interval.py index bd5029a..8e933cf 100644 --- a/src/apscheduler/triggers/interval.py +++ b/src/apscheduler/triggers/interval.py @@ -1,5 +1,7 @@ +from __future__ import annotations + from datetime import datetime, timedelta, tzinfo -from typing import Optional, Union +from typing import Optional from ..abc import Trigger from ..marshalling import marshal_date, unmarshal_date @@ -32,7 +34,7 @@ class IntervalTrigger(Trigger): def __init__(self, *, weeks: float = 0, days: float = 0, hours: float = 0, minutes: float = 0, seconds: float = 0, microseconds: float = 0, start_time: Optional[datetime] = None, end_time: Optional[datetime] = None, - timezone: Union[tzinfo, str] = 'local'): + timezone: tzinfo | str = 'local'): self.weeks = weeks self.days = days self.hours = hours diff --git a/src/apscheduler/validators.py b/src/apscheduler/validators.py index 10f47fd..6703667 100644 --- a/src/apscheduler/validators.py +++ b/src/apscheduler/validators.py @@ -1,6 +1,8 @@ +from __future__ import annotations + import sys from datetime import date, datetime, timedelta, timezone, tzinfo -from typing import Any, Dict, Optional, Union +from typing import Any, Optional from tzlocal import get_localzone @@ -21,7 +23,7 @@ def as_int(value) -> Optional[int]: return int(value) -def as_timezone(value: Union[str, tzinfo, None]) -> tzinfo: +def as_timezone(value: str | tzinfo | None) -> tzinfo: """ Convert the value into a tzinfo object. @@ -45,7 +47,7 @@ def as_timezone(value: Union[str, tzinfo, None]) -> tzinfo: f'{value.__class__.__qualname__} instead') -def as_date(value: Union[date, str, None]) -> Optional[date]: +def as_date(value: date | str | None) -> Optional[date]: """ Convert the value to a date. @@ -81,7 +83,7 @@ def as_ordinal_date(value: Optional[date]) -> Optional[int]: return value.toordinal() -def as_aware_datetime(value: Union[datetime, str, None], tz: tzinfo) -> Optional[datetime]: +def as_aware_datetime(value: datetime | str | None, tz: tzinfo) -> Optional[datetime]: """ Convert the value to a timezone aware datetime. @@ -152,7 +154,7 @@ def as_list(value, element_type: type, name: str) -> list: return value -def require_state_version(trigger: Trigger, state: Dict[str, Any], max_version: int) -> None: +def require_state_version(trigger: Trigger, state: dict[str, Any], max_version: int) -> None: try: if state['version'] > max_version: raise DeserializationError( diff --git a/src/apscheduler/workers/async_.py b/src/apscheduler/workers/async_.py index dbc72b3..c64d14c 100644 --- a/src/apscheduler/workers/async_.py +++ b/src/apscheduler/workers/async_.py @@ -6,7 +6,7 @@ from contextlib import AsyncExitStack from datetime import datetime, timezone from inspect import isawaitable from logging import Logger, getLogger -from typing import Any, Callable, Iterable, Optional, Set, Type, Union +from typing import Any, Callable, Iterable, Optional, Type from uuid import UUID import anyio @@ -31,16 +31,16 @@ class AsyncWorker(EventSource): _datastore_subscription: SubscriptionToken _wakeup_event: anyio.Event - def __init__(self, data_store: Union[DataStore, AsyncDataStore], *, + def __init__(self, data_store: DataStore | AsyncDataStore, *, max_concurrent_jobs: int = 100, identity: Optional[str] = None, logger: Optional[Logger] = None): self.max_concurrent_jobs = max_concurrent_jobs self.identity = identity or f'{platform.node()}-{os.getpid()}-{id(self)}' self.logger = logger or getLogger(__name__) - self._acquired_jobs: Set[Job] = set() + self._acquired_jobs: set[Job] = set() self._exit_stack = AsyncExitStack() self._events = AsyncEventHub() - self._running_jobs: Set[UUID] = set() + self._running_jobs: set[UUID] = set() if self.max_concurrent_jobs < 1: raise ValueError('max_concurrent_jobs must be at least 1') diff --git a/src/apscheduler/workers/sync.py b/src/apscheduler/workers/sync.py index 1fb1cba..8e023f6 100644 --- a/src/apscheduler/workers/sync.py +++ b/src/apscheduler/workers/sync.py @@ -7,7 +7,7 @@ from concurrent.futures import FIRST_COMPLETED, Future, ThreadPoolExecutor, wait from contextlib import ExitStack from datetime import datetime, timezone from logging import Logger, getLogger -from typing import Any, Callable, Iterable, Optional, Set, Type +from typing import Any, Callable, Iterable, Optional, Type from uuid import UUID from .. import events @@ -31,10 +31,10 @@ class Worker(EventSource): self.max_concurrent_jobs = max_concurrent_jobs self.identity = identity or f'{platform.node()}-{os.getpid()}-{id(self)}' self.logger = logger or getLogger(__name__) - self._acquired_jobs: Set[Job] = set() + self._acquired_jobs: set[Job] = set() self._exit_stack = ExitStack() self._events = EventHub() - self._running_jobs: Set[UUID] = set() + self._running_jobs: set[UUID] = set() if self.max_concurrent_jobs < 1: raise ValueError('max_concurrent_jobs must be at least 1') |