diff options
Diffstat (limited to 'src/apscheduler')
27 files changed, 283 insertions, 276 deletions
diff --git a/src/apscheduler/datastores/async_adapter.py b/src/apscheduler/datastores/async_adapter.py index 736685c..3537772 100644 --- a/src/apscheduler/datastores/async_adapter.py +++ b/src/apscheduler/datastores/async_adapter.py @@ -6,7 +6,7 @@ from functools import partial from typing import Iterable, Optional from uuid import UUID -import attr +import attrs from anyio import to_thread from anyio.from_thread import BlockingPortal @@ -18,12 +18,12 @@ from ..util import reentrant @reentrant -@attr.define(eq=False) +@attrs.define(eq=False) class AsyncDataStoreAdapter(AsyncDataStore): original: DataStore - _portal: BlockingPortal = attr.field(init=False) - _events: AsyncEventBroker = attr.field(init=False) - _exit_stack: AsyncExitStack = attr.field(init=False) + _portal: BlockingPortal = attrs.field(init=False) + _events: AsyncEventBroker = attrs.field(init=False) + _exit_stack: AsyncExitStack = attrs.field(init=False) @property def events(self) -> EventSource: diff --git a/src/apscheduler/datastores/async_sqlalchemy.py b/src/apscheduler/datastores/async_sqlalchemy.py index 3c6d963..4ad1483 100644 --- a/src/apscheduler/datastores/async_sqlalchemy.py +++ b/src/apscheduler/datastores/async_sqlalchemy.py @@ -6,7 +6,7 @@ from typing import Any, Iterable, Optional from uuid import UUID import anyio -import attr +import attrs import sniffio import tenacity from sqlalchemy import and_, bindparam, or_, select @@ -32,12 +32,12 @@ from .sqlalchemy import _BaseSQLAlchemyDataStore @reentrant -@attr.define(eq=False) +@attrs.define(eq=False) class AsyncSQLAlchemyDataStore(_BaseSQLAlchemyDataStore, AsyncDataStore): engine: AsyncEngine - _events: AsyncEventBroker = attr.field(factory=LocalAsyncEventBroker) - _retrying: tenacity.AsyncRetrying = attr.field(init=False) + _events: AsyncEventBroker = attrs.field(factory=LocalAsyncEventBroker) + _retrying: tenacity.AsyncRetrying = attrs.field(init=False) @classmethod def from_url(cls, url: str | URL, **options) -> AsyncSQLAlchemyDataStore: diff --git a/src/apscheduler/datastores/memory.py b/src/apscheduler/datastores/memory.py index 96514ca..6b8bc7b 100644 --- a/src/apscheduler/datastores/memory.py +++ b/src/apscheduler/datastores/memory.py @@ -7,7 +7,7 @@ from functools import partial from typing import Any, Iterable, Optional from uuid import UUID -import attr +import attrs from ..abc import DataStore, EventBroker, EventSource, Job, Schedule from ..enums import ConflictPolicy @@ -22,7 +22,7 @@ from ..util import reentrant max_datetime = datetime(MAXYEAR, 12, 31, 23, 59, 59, 999999, tzinfo=timezone.utc) -@attr.define +@attrs.define class TaskState: task: Task running_jobs: int = 0 @@ -32,12 +32,12 @@ class TaskState: return self.task.id == other.task.id -@attr.define +@attrs.define class ScheduleState: schedule: Schedule - next_fire_time: Optional[datetime] = attr.field(init=False, eq=False) - acquired_by: Optional[str] = attr.field(init=False, eq=False, default=None) - acquired_until: Optional[datetime] = attr.field(init=False, eq=False, default=None) + next_fire_time: Optional[datetime] = attrs.field(init=False, eq=False) + acquired_by: Optional[str] = attrs.field(init=False, eq=False, default=None) + acquired_until: Optional[datetime] = attrs.field(init=False, eq=False, default=None) def __attrs_post_init__(self): self.next_fire_time = self.schedule.next_fire_time @@ -57,12 +57,12 @@ class ScheduleState: return hash(self.schedule.id) -@attr.define(order=True) +@attrs.define(order=True) class JobState: - job: Job = attr.field(order=False) - created_at: datetime = attr.field(init=False, factory=partial(datetime.now, timezone.utc)) - acquired_by: Optional[str] = attr.field(eq=False, order=False, default=None) - acquired_until: Optional[datetime] = attr.field(eq=False, order=False, default=None) + job: Job = attrs.field(order=False) + created_at: datetime = attrs.field(init=False, factory=partial(datetime.now, timezone.utc)) + acquired_by: Optional[str] = attrs.field(eq=False, order=False, default=None) + acquired_until: Optional[datetime] = attrs.field(eq=False, order=False, default=None) def __eq__(self, other): return self.job.id == other.job.id @@ -72,18 +72,18 @@ class JobState: @reentrant -@attr.define(eq=False) +@attrs.define(eq=False) class MemoryDataStore(DataStore): lock_expiration_delay: float = 30 - _events: EventBroker = attr.Factory(LocalEventBroker) - _tasks: dict[str, TaskState] = attr.Factory(dict) - _schedules: list[ScheduleState] = attr.Factory(list) - _schedules_by_id: dict[str, ScheduleState] = attr.Factory(dict) - _schedules_by_task_id: dict[str, set[ScheduleState]] = attr.Factory(partial(defaultdict, set)) - _jobs: list[JobState] = attr.Factory(list) - _jobs_by_id: dict[UUID, JobState] = attr.Factory(dict) - _jobs_by_task_id: dict[str, set[JobState]] = attr.Factory(partial(defaultdict, set)) - _job_results: dict[UUID, JobResult] = attr.Factory(dict) + _events: EventBroker = attrs.Factory(LocalEventBroker) + _tasks: dict[str, TaskState] = attrs.Factory(dict) + _schedules: list[ScheduleState] = attrs.Factory(list) + _schedules_by_id: dict[str, ScheduleState] = attrs.Factory(dict) + _schedules_by_task_id: dict[str, set[ScheduleState]] = attrs.Factory(partial(defaultdict, set)) + _jobs: list[JobState] = attrs.Factory(list) + _jobs_by_id: dict[UUID, JobState] = attrs.Factory(dict) + _jobs_by_task_id: dict[str, set[JobState]] = attrs.Factory(partial(defaultdict, set)) + _job_results: dict[UUID, JobResult] = attrs.Factory(dict) def _find_schedule_index(self, state: ScheduleState) -> Optional[int]: left_index = bisect_left(self._schedules, state) diff --git a/src/apscheduler/datastores/mongodb.py b/src/apscheduler/datastores/mongodb.py index 4e514e2..9ece95f 100644 --- a/src/apscheduler/datastores/mongodb.py +++ b/src/apscheduler/datastores/mongodb.py @@ -8,10 +8,10 @@ from logging import Logger, getLogger from typing import Any, Callable, ClassVar, Iterable, Optional from uuid import UUID -import attr +import attrs import pymongo import tenacity -from attr.validators import instance_of +from attrs.validators import instance_of from bson import CodecOptions from bson.codec_options import TypeEncoder, TypeRegistry from pymongo import ASCENDING, DeleteOne, MongoClient, UpdateOne @@ -46,24 +46,24 @@ class CustomEncoder(TypeEncoder): @reentrant -@attr.define(eq=False) +@attrs.define(eq=False) class MongoDBDataStore(DataStore): - client: MongoClient = attr.field(validator=instance_of(MongoClient)) - serializer: Serializer = attr.field(factory=PickleSerializer, kw_only=True) - database: str = attr.field(default='apscheduler', kw_only=True) - lock_expiration_delay: float = attr.field(default=30, kw_only=True) - retry_settings: RetrySettings = attr.field(default=RetrySettings()) - start_from_scratch: bool = attr.field(default=False, kw_only=True) - - _task_attrs: ClassVar[list[str]] = [field.name for field in attr.fields(Task)] - _schedule_attrs: ClassVar[list[str]] = [field.name for field in attr.fields(Schedule)] - _job_attrs: ClassVar[list[str]] = [field.name for field in attr.fields(Job)] - - _logger: Logger = attr.field(init=False, factory=lambda: getLogger(__name__)) - _retrying: Retrying = attr.field(init=False) - _exit_stack: ExitStack = attr.field(init=False, factory=ExitStack) - _events: EventBroker = attr.field(init=False, factory=LocalEventBroker) - _local_tasks: dict[str, Task] = attr.field(init=False, factory=dict) + client: MongoClient = attrs.field(validator=instance_of(MongoClient)) + serializer: Serializer = attrs.field(factory=PickleSerializer, kw_only=True) + database: str = attrs.field(default='apscheduler', kw_only=True) + lock_expiration_delay: float = attrs.field(default=30, kw_only=True) + retry_settings: RetrySettings = attrs.field(default=RetrySettings()) + start_from_scratch: bool = attrs.field(default=False, kw_only=True) + + _task_attrs: ClassVar[list[str]] = [field.name for field in attrs.fields(Task)] + _schedule_attrs: ClassVar[list[str]] = [field.name for field in attrs.fields(Schedule)] + _job_attrs: ClassVar[list[str]] = [field.name for field in attrs.fields(Job)] + + _logger: Logger = attrs.field(init=False, factory=lambda: getLogger(__name__)) + _retrying: Retrying = attrs.field(init=False) + _exit_stack: ExitStack = attrs.field(init=False, factory=ExitStack) + _events: EventBroker = attrs.field(init=False, factory=LocalEventBroker) + _local_tasks: dict[str, Task] = attrs.field(init=False, factory=dict) def __attrs_post_init__(self) -> None: # Construct the Tenacity retry controller diff --git a/src/apscheduler/datastores/sqlalchemy.py b/src/apscheduler/datastores/sqlalchemy.py index a52d6c1..86e8138 100644 --- a/src/apscheduler/datastores/sqlalchemy.py +++ b/src/apscheduler/datastores/sqlalchemy.py @@ -6,7 +6,7 @@ from logging import Logger, getLogger from typing import Any, Iterable, Optional from uuid import UUID -import attr +import attrs import tenacity from sqlalchemy import ( JSON, TIMESTAMP, BigInteger, Column, Enum, Integer, LargeBinary, MetaData, Table, @@ -64,17 +64,17 @@ class EmulatedInterval(TypeDecorator): return timedelta(seconds=value) if value is not None else None -@attr.define(kw_only=True, eq=False) +@attrs.define(kw_only=True, eq=False) class _BaseSQLAlchemyDataStore: - schema: Optional[str] = attr.field(default=None) - serializer: Serializer = attr.field(factory=PickleSerializer) - lock_expiration_delay: float = attr.field(default=30) - max_poll_time: Optional[float] = attr.field(default=1) - max_idle_time: float = attr.field(default=60) - retry_settings: RetrySettings = attr.field(default=RetrySettings()) - start_from_scratch: bool = attr.field(default=False) + schema: Optional[str] = attrs.field(default=None) + serializer: Serializer = attrs.field(factory=PickleSerializer) + lock_expiration_delay: float = attrs.field(default=30) + max_poll_time: Optional[float] = attrs.field(default=1) + max_idle_time: float = attrs.field(default=60) + retry_settings: RetrySettings = attrs.field(default=RetrySettings()) + start_from_scratch: bool = attrs.field(default=False) - _logger: Logger = attr.field(init=False, factory=lambda: getLogger(__name__)) + _logger: Logger = attrs.field(init=False, factory=lambda: getLogger(__name__)) def __attrs_post_init__(self) -> None: # Generate the table definitions @@ -195,12 +195,12 @@ class _BaseSQLAlchemyDataStore: @reentrant -@attr.define(eq=False) +@attrs.define(eq=False) class SQLAlchemyDataStore(_BaseSQLAlchemyDataStore, DataStore): engine: Engine - _events: EventBroker = attr.field(init=False, factory=LocalEventBroker) - _retrying: tenacity.Retrying = attr.field(init=False) + _events: EventBroker = attrs.field(init=False, factory=LocalEventBroker) + _retrying: tenacity.Retrying = attrs.field(init=False) @classmethod def from_url(cls, url: str | URL, **options) -> SQLAlchemyDataStore: diff --git a/src/apscheduler/eventbrokers/async_adapter.py b/src/apscheduler/eventbrokers/async_adapter.py index a91aae3..3f1e13f 100644 --- a/src/apscheduler/eventbrokers/async_adapter.py +++ b/src/apscheduler/eventbrokers/async_adapter.py @@ -2,7 +2,7 @@ from __future__ import annotations from functools import partial -import attr +import attrs from anyio import to_thread from anyio.from_thread import BlockingPortal @@ -13,7 +13,7 @@ from apscheduler.util import reentrant @reentrant -@attr.define(eq=False) +@attrs.define(eq=False) class AsyncEventBrokerAdapter(LocalAsyncEventBroker): original: EventBroker portal: BlockingPortal diff --git a/src/apscheduler/eventbrokers/async_local.py b/src/apscheduler/eventbrokers/async_local.py index 79030f3..cb0fb96 100644 --- a/src/apscheduler/eventbrokers/async_local.py +++ b/src/apscheduler/eventbrokers/async_local.py @@ -4,7 +4,7 @@ from asyncio import iscoroutine from contextlib import AsyncExitStack from typing import Any, Callable -import attr +import attrs from anyio import create_task_group from anyio.abc import TaskGroup @@ -15,10 +15,10 @@ from .base import BaseEventBroker @reentrant -@attr.define(eq=False) +@attrs.define(eq=False) class LocalAsyncEventBroker(AsyncEventBroker, BaseEventBroker): - _task_group: TaskGroup = attr.field(init=False) - _exit_stack: AsyncExitStack = attr.field(init=False) + _task_group: TaskGroup = attrs.field(init=False) + _exit_stack: AsyncExitStack = attrs.field(init=False) async def __aenter__(self) -> LocalAsyncEventBroker: self._exit_stack = AsyncExitStack() diff --git a/src/apscheduler/eventbrokers/asyncpg.py b/src/apscheduler/eventbrokers/asyncpg.py index 93bfd6a..ca58da9 100644 --- a/src/apscheduler/eventbrokers/asyncpg.py +++ b/src/apscheduler/eventbrokers/asyncpg.py @@ -3,7 +3,7 @@ from __future__ import annotations from contextlib import asynccontextmanager from typing import TYPE_CHECKING, AsyncContextManager, AsyncGenerator, Callable -import attr +import attrs from anyio import TASK_STATUS_IGNORED, sleep from asyncpg import Connection from asyncpg.pool import Pool @@ -19,11 +19,11 @@ if TYPE_CHECKING: @reentrant -@attr.define(eq=False) +@attrs.define(eq=False) class AsyncpgEventBroker(LocalAsyncEventBroker, DistributedEventBrokerMixin): connection_factory: Callable[[], AsyncContextManager[Connection]] - channel: str = attr.field(kw_only=True, default='apscheduler') - max_idle_time: float = attr.field(kw_only=True, default=30) + channel: str = attrs.field(kw_only=True, default='apscheduler') + max_idle_time: float = attrs.field(kw_only=True, default=30) @classmethod def from_asyncpg_pool(cls, pool: Pool) -> AsyncpgEventBroker: diff --git a/src/apscheduler/eventbrokers/base.py b/src/apscheduler/eventbrokers/base.py index 9947f68..8ec4dcb 100644 --- a/src/apscheduler/eventbrokers/base.py +++ b/src/apscheduler/eventbrokers/base.py @@ -4,7 +4,7 @@ from base64 import b64decode, b64encode from logging import Logger, getLogger from typing import Any, Callable, Iterable, Optional -import attr +import attrs from .. import events from ..abc import EventBroker, Serializer, Subscription @@ -12,7 +12,7 @@ from ..events import Event from ..exceptions import DeserializationError -@attr.define(eq=False, frozen=True) +@attrs.define(eq=False, frozen=True) class LocalSubscription(Subscription): callback: Callable[[Event], Any] event_types: Optional[set[type[Event]]] @@ -24,10 +24,10 @@ class LocalSubscription(Subscription): self._source.unsubscribe(self.token) -@attr.define(eq=False) +@attrs.define(eq=False) class BaseEventBroker(EventBroker): - _logger: Logger = attr.field(init=False) - _subscriptions: dict[object, LocalSubscription] = attr.field(init=False, factory=dict) + _logger: Logger = attrs.field(init=False) + _subscriptions: dict[object, LocalSubscription] = attrs.field(init=False, factory=dict) def __attrs_post_init__(self) -> None: self._logger = getLogger(self.__class__.__module__) @@ -50,11 +50,11 @@ class DistributedEventBrokerMixin: _logger: Logger def generate_notification(self, event: Event) -> bytes: - serialized = self.serializer.serialize(attr.asdict(event)) + serialized = self.serializer.serialize(attrs.asdict(event)) return event.__class__.__name__.encode('ascii') + b' ' + serialized def generate_notification_str(self, event: Event) -> str: - serialized = self.serializer.serialize(attr.asdict(event)) + serialized = self.serializer.serialize(attrs.asdict(event)) return event.__class__.__name__ + ' ' + b64encode(serialized).decode('ascii') def _reconstitute_event(self, event_type: str, serialized: bytes) -> Optional[Event]: diff --git a/src/apscheduler/eventbrokers/local.py b/src/apscheduler/eventbrokers/local.py index acf0c9a..fd345a1 100644 --- a/src/apscheduler/eventbrokers/local.py +++ b/src/apscheduler/eventbrokers/local.py @@ -6,7 +6,7 @@ from contextlib import ExitStack from threading import Lock from typing import Any, Callable, Iterable, Optional -import attr +import attrs from ..abc import Subscription from ..events import Event @@ -15,11 +15,11 @@ from .base import BaseEventBroker @reentrant -@attr.define(eq=False) +@attrs.define(eq=False) class LocalEventBroker(BaseEventBroker): - _executor: ThreadPoolExecutor = attr.field(init=False) - _exit_stack: ExitStack = attr.field(init=False) - _subscriptions_lock: Lock = attr.field(init=False, factory=Lock) + _executor: ThreadPoolExecutor = attrs.field(init=False) + _exit_stack: ExitStack = attrs.field(init=False) + _subscriptions_lock: Lock = attrs.field(init=False, factory=Lock) def __enter__(self): self._exit_stack = ExitStack() diff --git a/src/apscheduler/eventbrokers/mqtt.py b/src/apscheduler/eventbrokers/mqtt.py index dbdffe4..60e7195 100644 --- a/src/apscheduler/eventbrokers/mqtt.py +++ b/src/apscheduler/eventbrokers/mqtt.py @@ -3,7 +3,7 @@ from __future__ import annotations from concurrent.futures import Future from typing import Any, Optional -import attr +import attrs from paho.mqtt.client import Client, MQTTMessage from paho.mqtt.properties import Properties from paho.mqtt.reasoncodes import ReasonCodes @@ -17,16 +17,16 @@ from .local import LocalEventBroker @reentrant -@attr.define(eq=False) +@attrs.define(eq=False) class MQTTEventBroker(LocalEventBroker, DistributedEventBrokerMixin): client: Client - serializer: Serializer = attr.field(factory=JSONSerializer) - host: str = attr.field(kw_only=True, default='localhost') - port: int = attr.field(kw_only=True, default=1883) - topic: str = attr.field(kw_only=True, default='apscheduler') - subscribe_qos: int = attr.field(kw_only=True, default=0) - publish_qos: int = attr.field(kw_only=True, default=0) - _ready_future: Future[None] = attr.field(init=False) + serializer: Serializer = attrs.field(factory=JSONSerializer) + host: str = attrs.field(kw_only=True, default='localhost') + port: int = attrs.field(kw_only=True, default=1883) + topic: str = attrs.field(kw_only=True, default='apscheduler') + subscribe_qos: int = attrs.field(kw_only=True, default=0) + publish_qos: int = attrs.field(kw_only=True, default=0) + _ready_future: Future[None] = attrs.field(init=False) def __enter__(self): super().__enter__() diff --git a/src/apscheduler/eventbrokers/redis.py b/src/apscheduler/eventbrokers/redis.py index 68b86e0..eae5678 100644 --- a/src/apscheduler/eventbrokers/redis.py +++ b/src/apscheduler/eventbrokers/redis.py @@ -4,7 +4,7 @@ from concurrent.futures import Future from threading import Thread from typing import Optional -import attr +import attrs from redis import ConnectionPool, Redis from ..abc import Serializer @@ -16,14 +16,14 @@ from .local import LocalEventBroker @reentrant -@attr.define(eq=False) +@attrs.define(eq=False) class RedisEventBroker(LocalEventBroker, DistributedEventBrokerMixin): client: Redis - serializer: Serializer = attr.field(factory=JSONSerializer) - channel: str = attr.field(kw_only=True, default='apscheduler') - message_poll_interval: float = attr.field(kw_only=True, default=0.05) - _stopped: bool = attr.field(init=False, default=True) - _ready_future: Future[None] = attr.field(init=False) + serializer: Serializer = attrs.field(factory=JSONSerializer) + channel: str = attrs.field(kw_only=True, default='apscheduler') + message_poll_interval: float = attrs.field(kw_only=True, default=0.05) + _stopped: bool = attrs.field(init=False, default=True) + _ready_future: Future[None] = attrs.field(init=False) @classmethod def from_url(cls, url: str, db: Optional[str] = None, decode_components: bool = False, diff --git a/src/apscheduler/events.py b/src/apscheduler/events.py index 86e98c7..9eb1c7a 100644 --- a/src/apscheduler/events.py +++ b/src/apscheduler/events.py @@ -5,82 +5,82 @@ from functools import partial from typing import Optional from uuid import UUID -import attr -from attr.converters import optional +import attrs +from attrs.converters import optional from .converters import as_aware_datetime, as_uuid from .enums import JobOutcome -@attr.define(kw_only=True, frozen=True) +@attrs.define(kw_only=True, frozen=True) class Event: - timestamp: datetime = attr.field(factory=partial(datetime.now, timezone.utc), - converter=as_aware_datetime) + timestamp: datetime = attrs.field(factory=partial(datetime.now, timezone.utc), + converter=as_aware_datetime) # # Data store events # -@attr.define(kw_only=True, frozen=True) +@attrs.define(kw_only=True, frozen=True) class DataStoreEvent(Event): pass -@attr.define(kw_only=True, frozen=True) +@attrs.define(kw_only=True, frozen=True) class TaskAdded(DataStoreEvent): task_id: str -@attr.define(kw_only=True, frozen=True) +@attrs.define(kw_only=True, frozen=True) class TaskUpdated(DataStoreEvent): task_id: str -@attr.define(kw_only=True, frozen=True) +@attrs.define(kw_only=True, frozen=True) class TaskRemoved(DataStoreEvent): task_id: str -@attr.define(kw_only=True, frozen=True) +@attrs.define(kw_only=True, frozen=True) class ScheduleAdded(DataStoreEvent): schedule_id: str - next_fire_time: Optional[datetime] = attr.field(converter=optional(as_aware_datetime)) + next_fire_time: Optional[datetime] = attrs.field(converter=optional(as_aware_datetime)) -@attr.define(kw_only=True, frozen=True) +@attrs.define(kw_only=True, frozen=True) class ScheduleUpdated(DataStoreEvent): schedule_id: str - next_fire_time: Optional[datetime] = attr.field(converter=optional(as_aware_datetime)) + next_fire_time: Optional[datetime] = attrs.field(converter=optional(as_aware_datetime)) -@attr.define(kw_only=True, frozen=True) +@attrs.define(kw_only=True, frozen=True) class ScheduleRemoved(DataStoreEvent): schedule_id: str -@attr.define(kw_only=True, frozen=True) +@attrs.define(kw_only=True, frozen=True) class JobAdded(DataStoreEvent): - job_id: UUID = attr.field(converter=as_uuid) + job_id: UUID = attrs.field(converter=as_uuid) task_id: str schedule_id: Optional[str] - tags: frozenset[str] = attr.field(converter=frozenset) + tags: frozenset[str] = attrs.field(converter=frozenset) -@attr.define(kw_only=True, frozen=True) +@attrs.define(kw_only=True, frozen=True) class JobRemoved(DataStoreEvent): - job_id: UUID = attr.field(converter=as_uuid) + job_id: UUID = attrs.field(converter=as_uuid) -@attr.define(kw_only=True, frozen=True) +@attrs.define(kw_only=True, frozen=True) class ScheduleDeserializationFailed(DataStoreEvent): schedule_id: str exception: BaseException -@attr.define(kw_only=True, frozen=True) +@attrs.define(kw_only=True, frozen=True) class JobDeserializationFailed(DataStoreEvent): - job_id: UUID = attr.field(converter=as_uuid) + job_id: UUID = attrs.field(converter=as_uuid) exception: BaseException @@ -88,17 +88,17 @@ class JobDeserializationFailed(DataStoreEvent): # Scheduler events # -@attr.define(kw_only=True, frozen=True) +@attrs.define(kw_only=True, frozen=True) class SchedulerEvent(Event): pass -@attr.define(kw_only=True, frozen=True) +@attrs.define(kw_only=True, frozen=True) class SchedulerStarted(SchedulerEvent): pass -@attr.define(kw_only=True, frozen=True) +@attrs.define(kw_only=True, frozen=True) class SchedulerStopped(SchedulerEvent): exception: Optional[BaseException] = None @@ -107,33 +107,33 @@ class SchedulerStopped(SchedulerEvent): # Worker events # -@attr.define(kw_only=True, frozen=True) +@attrs.define(kw_only=True, frozen=True) class WorkerEvent(Event): pass -@attr.define(kw_only=True, frozen=True) +@attrs.define(kw_only=True, frozen=True) class WorkerStarted(WorkerEvent): pass -@attr.define(kw_only=True, frozen=True) +@attrs.define(kw_only=True, frozen=True) class WorkerStopped(WorkerEvent): exception: Optional[BaseException] = None -@attr.define(kw_only=True, frozen=True) +@attrs.define(kw_only=True, frozen=True) class JobAcquired(WorkerEvent): """Signals that a worker has acquired a job for processing.""" - job_id: UUID = attr.field(converter=as_uuid) + job_id: UUID = attrs.field(converter=as_uuid) worker_id: str -@attr.define(kw_only=True, frozen=True) +@attrs.define(kw_only=True, frozen=True) class JobReleased(WorkerEvent): """Signals that a worker has finished processing of a job.""" - job_id: UUID = attr.field(converter=as_uuid) + job_id: UUID = attrs.field(converter=as_uuid) worker_id: str outcome: JobOutcome diff --git a/src/apscheduler/schedulers/async_.py b/src/apscheduler/schedulers/async_.py index a259678..61e2e64 100644 --- a/src/apscheduler/schedulers/async_.py +++ b/src/apscheduler/schedulers/async_.py @@ -10,7 +10,7 @@ from typing import Any, Callable, Iterable, Mapping, Optional from uuid import UUID, uuid4 import anyio -import attr +import attrs from anyio import TASK_STATUS_IGNORED, create_task_group, get_cancelled_exc_class, move_on_after from ..abc import AsyncDataStore, EventSource, Job, Schedule, Trigger @@ -30,20 +30,20 @@ _microsecond_delta = timedelta(microseconds=1) _zero_timedelta = timedelta() -@attr.define(eq=False) +@attrs.define(eq=False) class AsyncScheduler: """An asynchronous (AnyIO based) scheduler implementation.""" - data_store: AsyncDataStore = attr.field(converter=as_async_datastore, factory=MemoryDataStore) - identity: str = attr.field(kw_only=True, default=None) - start_worker: bool = attr.field(kw_only=True, default=True) - logger: Optional[Logger] = attr.field(kw_only=True, default=getLogger(__name__)) + data_store: AsyncDataStore = attrs.field(converter=as_async_datastore, factory=MemoryDataStore) + identity: str = attrs.field(kw_only=True, default=None) + start_worker: bool = attrs.field(kw_only=True, default=True) + logger: Optional[Logger] = attrs.field(kw_only=True, default=getLogger(__name__)) - _state: RunState = attr.field(init=False, default=RunState.stopped) - _wakeup_event: anyio.Event = attr.field(init=False) - _worker: Optional[AsyncWorker] = attr.field(init=False, default=None) - _events: LocalAsyncEventBroker = attr.field(init=False, factory=LocalAsyncEventBroker) - _exit_stack: AsyncExitStack = attr.field(init=False) + _state: RunState = attrs.field(init=False, default=RunState.stopped) + _wakeup_event: anyio.Event = attrs.field(init=False) + _worker: Optional[AsyncWorker] = attrs.field(init=False, default=None) + _events: LocalAsyncEventBroker = attrs.field(init=False, factory=LocalAsyncEventBroker) + _exit_stack: AsyncExitStack = attrs.field(init=False) def __attrs_post_init__(self) -> None: if not self.identity: diff --git a/src/apscheduler/schedulers/sync.py b/src/apscheduler/schedulers/sync.py index cc77d45..22270f0 100644 --- a/src/apscheduler/schedulers/sync.py +++ b/src/apscheduler/schedulers/sync.py @@ -11,7 +11,7 @@ from logging import Logger, getLogger from typing import Any, Callable, Iterable, Mapping, Optional from uuid import UUID, uuid4 -import attr +import attrs from ..abc import DataStore, EventSource, Trigger from ..context import current_scheduler @@ -29,20 +29,20 @@ _microsecond_delta = timedelta(microseconds=1) _zero_timedelta = timedelta() -@attr.define(eq=False) +@attrs.define(eq=False) class Scheduler: """A synchronous scheduler implementation.""" - data_store: DataStore = attr.field(factory=MemoryDataStore) - identity: str = attr.field(kw_only=True, default=None) - start_worker: bool = attr.field(kw_only=True, default=True) - logger: Optional[Logger] = attr.field(kw_only=True, default=getLogger(__name__)) + data_store: DataStore = attrs.field(factory=MemoryDataStore) + identity: str = attrs.field(kw_only=True, default=None) + start_worker: bool = attrs.field(kw_only=True, default=True) + logger: Optional[Logger] = attrs.field(kw_only=True, default=getLogger(__name__)) - _state: RunState = attr.field(init=False, default=RunState.stopped) - _wakeup_event: threading.Event = attr.field(init=False) - _worker: Optional[Worker] = attr.field(init=False, default=None) - _events: LocalEventBroker = attr.field(init=False, factory=LocalEventBroker) - _exit_stack: ExitStack = attr.field(init=False) + _state: RunState = attrs.field(init=False, default=RunState.stopped) + _wakeup_event: threading.Event = attrs.field(init=False) + _worker: Optional[Worker] = attrs.field(init=False, default=None) + _events: LocalEventBroker = attrs.field(init=False, factory=LocalEventBroker) + _exit_stack: ExitStack = attrs.field(init=False) def __attrs_post_init__(self) -> None: if not self.identity: diff --git a/src/apscheduler/serializers/cbor.py b/src/apscheduler/serializers/cbor.py index d80bce1..bb610c5 100644 --- a/src/apscheduler/serializers/cbor.py +++ b/src/apscheduler/serializers/cbor.py @@ -2,18 +2,18 @@ from __future__ import annotations from typing import Any -import attr +import attrs from cbor2 import CBOREncodeTypeError, CBORTag, dumps, loads from ..abc import Serializer from ..marshalling import marshal_object, unmarshal_object -@attr.define(kw_only=True, eq=False) +@attrs.define(kw_only=True, eq=False) class CBORSerializer(Serializer): type_tag: int = 4664 - dump_options: dict[str, Any] = attr.field(factory=dict) - load_options: dict[str, Any] = attr.field(factory=dict) + dump_options: dict[str, Any] = attrs.field(factory=dict) + load_options: dict[str, Any] = attrs.field(factory=dict) def __attrs_post_init__(self): self.dump_options.setdefault('default', self._default_hook) diff --git a/src/apscheduler/serializers/json.py b/src/apscheduler/serializers/json.py index 8bfe6d7..02ad12f 100644 --- a/src/apscheduler/serializers/json.py +++ b/src/apscheduler/serializers/json.py @@ -4,17 +4,17 @@ from datetime import datetime from json import dumps, loads from typing import Any -import attr +import attrs from ..abc import Serializer from ..marshalling import marshal_date, marshal_object, unmarshal_object -@attr.define(kw_only=True, eq=False) +@attrs.define(kw_only=True, eq=False) class JSONSerializer(Serializer): magic_key: str = '_apscheduler_json' - dump_options: dict[str, Any] = attr.field(factory=dict) - load_options: dict[str, Any] = attr.field(factory=dict) + dump_options: dict[str, Any] = attrs.field(factory=dict) + load_options: dict[str, Any] = attrs.field(factory=dict) def __attrs_post_init__(self): self.dump_options['default'] = self._default_hook diff --git a/src/apscheduler/serializers/pickle.py b/src/apscheduler/serializers/pickle.py index 9092ec7..d03bdc0 100644 --- a/src/apscheduler/serializers/pickle.py +++ b/src/apscheduler/serializers/pickle.py @@ -1,11 +1,11 @@ from pickle import dumps, loads -import attr +import attrs from ..abc import Serializer -@attr.define(kw_only=True, eq=False) +@attrs.define(kw_only=True, eq=False) class PickleSerializer(Serializer): protocol: int = 4 diff --git a/src/apscheduler/structures.py b/src/apscheduler/structures.py index cffffc7..7f6457d 100644 --- a/src/apscheduler/structures.py +++ b/src/apscheduler/structures.py @@ -5,10 +5,10 @@ from functools import partial from typing import Any, Callable, Optional from uuid import UUID, uuid4 -import attr +import attrs import tenacity.stop import tenacity.wait -from attr.validators import instance_of +from attrs.validators import instance_of from . import abc from .converters import as_enum, as_timedelta @@ -16,16 +16,23 @@ from .enums import CoalescePolicy, JobOutcome from .marshalling import callable_from_ref, callable_to_ref -@attr.define(kw_only=True) +def serialize(inst, field, value): + if isinstance(value, frozenset): + return list(value) + + return value + + +@attrs.define(kw_only=True) class Task: id: str - func: Callable = attr.field(eq=False, order=False) - max_running_jobs: Optional[int] = attr.field(eq=False, order=False, default=None) - misfire_grace_time: Optional[timedelta] = attr.field(eq=False, order=False, default=None) + func: Callable = attrs.field(eq=False, order=False) + max_running_jobs: Optional[int] = attrs.field(eq=False, order=False, default=None) + misfire_grace_time: Optional[timedelta] = attrs.field(eq=False, order=False, default=None) state: Any = None def marshal(self, serializer: abc.Serializer) -> dict[str, Any]: - marshalled = attr.asdict(self) + marshalled = attrs.asdict(self, value_serializer=serialize) marshalled['func'] = callable_to_ref(self.func) marshalled['state'] = serializer.serialize(self.state) if self.state else None return marshalled @@ -39,27 +46,27 @@ class Task: return cls(**marshalled) -@attr.define(kw_only=True) +@attrs.define(kw_only=True) class Schedule: id: str - task_id: str = attr.field(eq=False, order=False) - trigger: abc.Trigger = attr.field(eq=False, order=False) - args: tuple = attr.field(eq=False, order=False, converter=tuple, default=()) - kwargs: dict[str, Any] = attr.field(eq=False, order=False, converter=dict, default=()) - coalesce: CoalescePolicy = attr.field(eq=False, order=False, default=CoalescePolicy.latest, - converter=as_enum(CoalescePolicy)) - misfire_grace_time: Optional[timedelta] = attr.field(eq=False, order=False, default=None, - converter=as_timedelta) - max_jitter: Optional[timedelta] = attr.field(eq=False, order=False, converter=as_timedelta, - default=None) - tags: frozenset[str] = attr.field(eq=False, order=False, converter=frozenset, default=()) - next_fire_time: Optional[datetime] = attr.field(eq=False, order=False, default=None) - last_fire_time: Optional[datetime] = attr.field(eq=False, order=False, default=None) - acquired_by: Optional[str] = attr.field(eq=False, order=False, default=None) - acquired_until: Optional[datetime] = attr.field(eq=False, order=False, default=None) + task_id: str = attrs.field(eq=False, order=False) + trigger: abc.Trigger = attrs.field(eq=False, order=False) + args: tuple = attrs.field(eq=False, order=False, converter=tuple, default=()) + kwargs: dict[str, Any] = attrs.field(eq=False, order=False, converter=dict, default=()) + coalesce: CoalescePolicy = attrs.field(eq=False, order=False, default=CoalescePolicy.latest, + converter=as_enum(CoalescePolicy)) + misfire_grace_time: Optional[timedelta] = attrs.field(eq=False, order=False, default=None, + converter=as_timedelta) + max_jitter: Optional[timedelta] = attrs.field(eq=False, order=False, converter=as_timedelta, + default=None) + tags: frozenset[str] = attrs.field(eq=False, order=False, converter=frozenset, default=()) + next_fire_time: Optional[datetime] = attrs.field(eq=False, order=False, default=None) + last_fire_time: Optional[datetime] = attrs.field(eq=False, order=False, default=None) + acquired_by: Optional[str] = attrs.field(eq=False, order=False, default=None) + acquired_until: Optional[datetime] = attrs.field(eq=False, order=False, default=None) def marshal(self, serializer: abc.Serializer) -> dict[str, Any]: - marshalled = attr.asdict(self) + marshalled = attrs.asdict(self, value_serializer=serialize) marshalled['trigger'] = serializer.serialize(self.trigger) marshalled['args'] = serializer.serialize(self.args) marshalled['kwargs'] = serializer.serialize(self.kwargs) @@ -84,23 +91,23 @@ class Schedule: return None -@attr.define(kw_only=True) +@attrs.define(kw_only=True) class Job: - id: UUID = attr.field(factory=uuid4) - task_id: str = attr.field(eq=False, order=False) - args: tuple = attr.field(eq=False, order=False, converter=tuple, default=()) - kwargs: dict[str, Any] = attr.field(eq=False, order=False, converter=dict, default=()) - schedule_id: Optional[str] = attr.field(eq=False, order=False, default=None) - scheduled_fire_time: Optional[datetime] = attr.field(eq=False, order=False, default=None) - jitter: timedelta = attr.field(eq=False, order=False, converter=as_timedelta, - factory=timedelta) - start_deadline: Optional[datetime] = attr.field(eq=False, order=False, default=None) - tags: frozenset[str] = attr.field(eq=False, order=False, converter=frozenset, default=()) - created_at: datetime = attr.field(eq=False, order=False, - factory=partial(datetime.now, timezone.utc)) - started_at: Optional[datetime] = attr.field(eq=False, order=False, default=None) - acquired_by: Optional[str] = attr.field(eq=False, order=False, default=None) - acquired_until: Optional[datetime] = attr.field(eq=False, order=False, default=None) + id: UUID = attrs.field(factory=uuid4) + task_id: str = attrs.field(eq=False, order=False) + args: tuple = attrs.field(eq=False, order=False, converter=tuple, default=()) + kwargs: dict[str, Any] = attrs.field(eq=False, order=False, converter=dict, default=()) + schedule_id: Optional[str] = attrs.field(eq=False, order=False, default=None) + scheduled_fire_time: Optional[datetime] = attrs.field(eq=False, order=False, default=None) + jitter: timedelta = attrs.field(eq=False, order=False, converter=as_timedelta, + factory=timedelta) + start_deadline: Optional[datetime] = attrs.field(eq=False, order=False, default=None) + tags: frozenset[str] = attrs.field(eq=False, order=False, converter=frozenset, default=()) + created_at: datetime = attrs.field(eq=False, order=False, + factory=partial(datetime.now, timezone.utc)) + started_at: Optional[datetime] = attrs.field(eq=False, order=False, default=None) + acquired_by: Optional[str] = attrs.field(eq=False, order=False, default=None) + acquired_until: Optional[datetime] = attrs.field(eq=False, order=False, default=None) @property def original_scheduled_time(self) -> Optional[datetime]: @@ -111,7 +118,7 @@ class Job: return self.scheduled_fire_time - self.jitter def marshal(self, serializer: abc.Serializer) -> dict[str, Any]: - marshalled = attr.asdict(self) + marshalled = attrs.asdict(self, value_serializer=serialize) marshalled['args'] = serializer.serialize(self.args) marshalled['kwargs'] = serializer.serialize(self.kwargs) if not self.acquired_by: @@ -127,7 +134,7 @@ class Job: return cls(**marshalled) -@attr.define(kw_only=True) +@attrs.define(kw_only=True) class JobInfo: job_id: UUID task_id: str @@ -144,17 +151,17 @@ class JobInfo: start_deadline=job.start_deadline, tags=job.tags) -@attr.define(kw_only=True, frozen=True) +@attrs.define(kw_only=True, frozen=True) class JobResult: job_id: UUID - outcome: JobOutcome = attr.field(eq=False, order=False, converter=as_enum(JobOutcome)) - finished_at: datetime = attr.field(eq=False, order=False, - factory=partial(datetime.now, timezone.utc)) - exception: Optional[BaseException] = attr.field(eq=False, order=False, default=None) - return_value: Any = attr.field(eq=False, order=False, default=None) + outcome: JobOutcome = attrs.field(eq=False, order=False, converter=as_enum(JobOutcome)) + finished_at: datetime = attrs.field(eq=False, order=False, + factory=partial(datetime.now, timezone.utc)) + exception: Optional[BaseException] = attrs.field(eq=False, order=False, default=None) + return_value: Any = attrs.field(eq=False, order=False, default=None) def marshal(self, serializer: abc.Serializer) -> dict[str, Any]: - marshalled = attr.asdict(self) + marshalled = attrs.asdict(self, value_serializer=serialize) if self.outcome is JobOutcome.error: marshalled['exception'] = serializer.serialize(self.exception) else: @@ -177,12 +184,12 @@ class JobResult: return cls(**marshalled) -@attr.define(kw_only=True, frozen=True) +@attrs.define(kw_only=True, frozen=True) class RetrySettings: - stop: tenacity.stop.stop_base = attr.field(validator=instance_of(tenacity.stop.stop_base), - default=tenacity.stop_after_delay(60)) - wait: tenacity.wait.wait_base = attr.field(validator=instance_of(tenacity.wait.wait_base), - default=tenacity.wait_exponential(min=0.5, max=20)) + stop: tenacity.stop.stop_base = attrs.field(validator=instance_of(tenacity.stop.stop_base), + default=tenacity.stop_after_delay(60)) + wait: tenacity.wait.wait_base = attrs.field(validator=instance_of(tenacity.wait.wait_base), + default=tenacity.wait_exponential(min=0.5, max=20)) @classmethod def fail_immediately(cls) -> RetrySettings: diff --git a/src/apscheduler/triggers/calendarinterval.py b/src/apscheduler/triggers/calendarinterval.py index 2c380d3..8d0990d 100644 --- a/src/apscheduler/triggers/calendarinterval.py +++ b/src/apscheduler/triggers/calendarinterval.py @@ -3,7 +3,7 @@ from __future__ import annotations from datetime import date, datetime, time, timedelta, tzinfo from typing import Any, Optional -import attr +import attrs from ..abc import Trigger from ..marshalling import marshal_date, marshal_timezone, unmarshal_date, unmarshal_timezone @@ -11,7 +11,7 @@ from ..util import timezone_repr from ..validators import as_date, as_timezone, require_state_version -@attr.define(kw_only=True) +@attrs.define(kw_only=True) class CalendarIntervalTrigger(Trigger): """ Runs the task on specified calendar-based intervals always at the same exact time of day. @@ -63,11 +63,11 @@ class CalendarIntervalTrigger(Trigger): hour: int = 0 minute: int = 0 second: int = 0 - start_date: date = attr.field(converter=as_date, factory=date.today) - end_date: date | None = attr.field(converter=as_date, default=None) - timezone: tzinfo = attr.field(converter=as_timezone, default='local') - _time: time = attr.field(init=False, eq=False) - _last_fire_date: Optional[date] = attr.field(init=False, eq=False, default=None) + start_date: date = attrs.field(converter=as_date, factory=date.today) + end_date: date | None = attrs.field(converter=as_date, default=None) + timezone: tzinfo = attrs.field(converter=as_timezone, default='local') + _time: time = attrs.field(init=False, eq=False) + _last_fire_date: Optional[date] = attrs.field(init=False, eq=False, default=None) def __attrs_post_init__(self) -> None: self._time = time(self.hour, self.minute, self.second, tzinfo=self.timezone) diff --git a/src/apscheduler/triggers/combining.py b/src/apscheduler/triggers/combining.py index c719073..fa0b5c9 100644 --- a/src/apscheduler/triggers/combining.py +++ b/src/apscheduler/triggers/combining.py @@ -4,7 +4,7 @@ from abc import abstractmethod from datetime import datetime, timedelta from typing import Any, Optional -import attr +import attrs from ..abc import Trigger from ..exceptions import MaxIterationsReached @@ -12,10 +12,10 @@ from ..marshalling import marshal_object, unmarshal_object from ..validators import as_timedelta, require_state_version -@attr.define +@attrs.define class BaseCombiningTrigger(Trigger): triggers: list[Trigger] - _next_fire_times: list[Optional[datetime]] = attr.field(init=False, eq=False, factory=list) + _next_fire_times: list[Optional[datetime]] = attrs.field(init=False, eq=False, factory=list) def __getstate__(self) -> dict[str, Any]: return { @@ -30,7 +30,7 @@ class BaseCombiningTrigger(Trigger): self._next_fire_times = state['next_fire_times'] -@attr.define +@attrs.define class AndTrigger(BaseCombiningTrigger): """ Fires on times produced by the enclosed triggers whenever the fire times are within the given @@ -50,7 +50,7 @@ class AndTrigger(BaseCombiningTrigger): :param max_iterations: maximum number of iterations of fire time calculations before giving up """ - threshold: timedelta = attr.field(converter=as_timedelta, default=1) + threshold: timedelta = attrs.field(converter=as_timedelta, default=1) max_iterations: Optional[int] = 10000 def next(self) -> Optional[datetime]: @@ -102,7 +102,7 @@ class AndTrigger(BaseCombiningTrigger): f'threshold={self.threshold.total_seconds()}, max_iterations={self.max_iterations})' -@attr.define +@attrs.define class OrTrigger(BaseCombiningTrigger): """ Fires on every fire time of every trigger in chronological order. diff --git a/src/apscheduler/triggers/cron/__init__.py b/src/apscheduler/triggers/cron/__init__.py index 40da147..1c1e582 100644 --- a/src/apscheduler/triggers/cron/__init__.py +++ b/src/apscheduler/triggers/cron/__init__.py @@ -3,7 +3,7 @@ from __future__ import annotations from datetime import datetime, timedelta, tzinfo from typing import Any, ClassVar, Optional, Sequence -import attr +import attrs from tzlocal import get_localzone from ...abc import Trigger @@ -14,7 +14,7 @@ from .fields import ( DEFAULT_VALUES, BaseField, DayOfMonthField, DayOfWeekField, MonthField, WeekField) -@attr.define(kw_only=True) +@attrs.define(kw_only=True) class CronTrigger(Trigger): """ Triggers when current time matches all specified time constraints, similarly to how the UNIX @@ -55,11 +55,11 @@ class CronTrigger(Trigger): hour: int | str | None = None minute: int | str | None = None second: int | str | None = None - start_time: datetime = attr.field(converter=as_aware_datetime, factory=datetime.now) + start_time: datetime = attrs.field(converter=as_aware_datetime, factory=datetime.now) end_time: datetime | None = None - timezone: tzinfo | str = attr.field(converter=as_timezone, factory=get_localzone) - _fields: list[BaseField] = attr.field(init=False, eq=False, factory=list) - _last_fire_time: Optional[datetime] = attr.field(init=False, eq=False, default=None) + timezone: tzinfo | str = attrs.field(converter=as_timezone, factory=get_localzone) + _fields: list[BaseField] = attrs.field(init=False, eq=False, factory=list) + _last_fire_time: Optional[datetime] = attrs.field(init=False, eq=False, default=None) def __attrs_post_init__(self) -> None: self._set_fields([self.year, self.month, self.day, self.week, self.day_of_week, self.hour, diff --git a/src/apscheduler/triggers/date.py b/src/apscheduler/triggers/date.py index db06130..08494c3 100644 --- a/src/apscheduler/triggers/date.py +++ b/src/apscheduler/triggers/date.py @@ -3,14 +3,14 @@ from __future__ import annotations from datetime import datetime from typing import Any, Optional -import attr +import attrs from ..abc import Trigger from ..marshalling import marshal_date, unmarshal_date from ..validators import as_aware_datetime, require_state_version -@attr.define +@attrs.define class DateTrigger(Trigger): """ Triggers once on the given date/time. @@ -18,8 +18,8 @@ class DateTrigger(Trigger): :param run_time: the date/time to run the job at """ - run_time: datetime = attr.field(converter=as_aware_datetime) - _completed: bool = attr.field(init=False, eq=False, default=False) + run_time: datetime = attrs.field(converter=as_aware_datetime) + _completed: bool = attrs.field(init=False, eq=False, default=False) def next(self) -> Optional[datetime]: if not self._completed: diff --git a/src/apscheduler/triggers/interval.py b/src/apscheduler/triggers/interval.py index 1b11095..0f85d36 100644 --- a/src/apscheduler/triggers/interval.py +++ b/src/apscheduler/triggers/interval.py @@ -3,14 +3,14 @@ from __future__ import annotations from datetime import datetime, timedelta from typing import Any, Optional -import attr +import attrs from ..abc import Trigger from ..marshalling import marshal_date, unmarshal_date from ..validators import as_aware_datetime, require_state_version -@attr.define(kw_only=True) +@attrs.define(kw_only=True) class IntervalTrigger(Trigger): """ Triggers on specified intervals. @@ -36,10 +36,10 @@ class IntervalTrigger(Trigger): minutes: float = 0 seconds: float = 0 microseconds: float = 0 - start_time: datetime = attr.field(converter=as_aware_datetime, factory=datetime.now) - end_time: Optional[datetime] = attr.field(converter=as_aware_datetime, default=None) - _interval: timedelta = attr.field(init=False, eq=False, repr=False) - _last_fire_time: Optional[datetime] = attr.field(init=False, eq=False, default=None) + start_time: datetime = attrs.field(converter=as_aware_datetime, factory=datetime.now) + end_time: Optional[datetime] = attrs.field(converter=as_aware_datetime, default=None) + _interval: timedelta = attrs.field(init=False, eq=False, repr=False) + _last_fire_time: Optional[datetime] = attrs.field(init=False, eq=False, default=None) def __attrs_post_init__(self) -> None: self._interval = timedelta(weeks=self.weeks, days=self.days, hours=self.hours, diff --git a/src/apscheduler/validators.py b/src/apscheduler/validators.py index 95c3747..179eabd 100644 --- a/src/apscheduler/validators.py +++ b/src/apscheduler/validators.py @@ -4,8 +4,8 @@ import sys from datetime import date, datetime, timedelta, timezone, tzinfo from typing import Any, Optional -import attr -from attr import Attribute +import attrs +from attrs import Attribute from tzlocal import get_localzone from .abc import Trigger @@ -166,6 +166,6 @@ def require_state_version(trigger: Trigger, state: dict[str, Any], max_version: raise DeserializationError('Missing "version" key in the serialized state') from exc -def positive_integer(inst, field: attr.Attribute, value) -> None: +def positive_integer(inst, field: attrs.Attribute, value) -> None: if value <= 0: raise ValueError(f'{field} must be a positive integer') diff --git a/src/apscheduler/workers/async_.py b/src/apscheduler/workers/async_.py index e59f0d9..50f76a9 100644 --- a/src/apscheduler/workers/async_.py +++ b/src/apscheduler/workers/async_.py @@ -10,7 +10,7 @@ from typing import Callable, Optional from uuid import UUID import anyio -import attr +import attrs from anyio import TASK_STATUS_IGNORED, create_task_group, get_cancelled_exc_class, move_on_after from anyio.abc import CancelScope @@ -24,20 +24,20 @@ from ..structures import JobInfo, JobResult from ..validators import positive_integer -@attr.define(eq=False) +@attrs.define(eq=False) class AsyncWorker: """Runs jobs locally in a task group.""" - data_store: AsyncDataStore = attr.field(converter=as_async_datastore) - max_concurrent_jobs: int = attr.field(kw_only=True, validator=positive_integer, default=100) - identity: str = attr.field(kw_only=True, default=None) - logger: Optional[Logger] = attr.field(kw_only=True, default=getLogger(__name__)) - - _state: RunState = attr.field(init=False, default=RunState.stopped) - _wakeup_event: anyio.Event = attr.field(init=False, factory=anyio.Event) - _acquired_jobs: set[Job] = attr.field(init=False, factory=set) - _events: LocalAsyncEventBroker = attr.field(init=False, factory=LocalAsyncEventBroker) - _running_jobs: set[UUID] = attr.field(init=False, factory=set) - _exit_stack: AsyncExitStack = attr.field(init=False) + data_store: AsyncDataStore = attrs.field(converter=as_async_datastore) + max_concurrent_jobs: int = attrs.field(kw_only=True, validator=positive_integer, default=100) + identity: str = attrs.field(kw_only=True, default=None) + logger: Optional[Logger] = attrs.field(kw_only=True, default=getLogger(__name__)) + + _state: RunState = attrs.field(init=False, default=RunState.stopped) + _wakeup_event: anyio.Event = attrs.field(init=False, factory=anyio.Event) + _acquired_jobs: set[Job] = attrs.field(init=False, factory=set) + _events: LocalAsyncEventBroker = attrs.field(init=False, factory=LocalAsyncEventBroker) + _running_jobs: set[UUID] = attrs.field(init=False, factory=set) + _exit_stack: AsyncExitStack = attrs.field(init=False) def __attrs_post_init__(self) -> None: if not self.identity: diff --git a/src/apscheduler/workers/sync.py b/src/apscheduler/workers/sync.py index 6bac3ca..69718b5 100644 --- a/src/apscheduler/workers/sync.py +++ b/src/apscheduler/workers/sync.py @@ -11,7 +11,7 @@ from logging import Logger, getLogger from typing import Callable, Optional from uuid import UUID -import attr +import attrs from ..abc import DataStore, EventSource from ..context import current_worker, job_info @@ -22,21 +22,21 @@ from ..structures import Job, JobInfo, JobResult from ..validators import positive_integer -@attr.define(eq=False) +@attrs.define(eq=False) class Worker: """Runs jobs locally in a thread pool.""" data_store: DataStore - max_concurrent_jobs: int = attr.field(kw_only=True, validator=positive_integer, default=20) - identity: str = attr.field(kw_only=True, default=None) - logger: Optional[Logger] = attr.field(kw_only=True, default=getLogger(__name__)) - - _state: RunState = attr.field(init=False, default=RunState.stopped) - _wakeup_event: threading.Event = attr.field(init=False) - _acquired_jobs: set[Job] = attr.field(init=False, factory=set) - _events: LocalEventBroker = attr.field(init=False, factory=LocalEventBroker) - _running_jobs: set[UUID] = attr.field(init=False, factory=set) - _exit_stack: ExitStack = attr.field(init=False) - _executor: ThreadPoolExecutor = attr.field(init=False) + max_concurrent_jobs: int = attrs.field(kw_only=True, validator=positive_integer, default=20) + identity: str = attrs.field(kw_only=True, default=None) + logger: Optional[Logger] = attrs.field(kw_only=True, default=getLogger(__name__)) + + _state: RunState = attrs.field(init=False, default=RunState.stopped) + _wakeup_event: threading.Event = attrs.field(init=False) + _acquired_jobs: set[Job] = attrs.field(init=False, factory=set) + _events: LocalEventBroker = attrs.field(init=False, factory=LocalEventBroker) + _running_jobs: set[UUID] = attrs.field(init=False, factory=set) + _exit_stack: ExitStack = attrs.field(init=False) + _executor: ThreadPoolExecutor = attrs.field(init=False) def __attrs_post_init__(self) -> None: if not self.identity: |