diff options
author | Alex Grönholm <alex.gronholm@nextday.fi> | 2021-09-21 18:30:31 +0300 |
---|---|---|
committer | Alex Grönholm <alex.gronholm@nextday.fi> | 2021-09-22 23:55:40 +0300 |
commit | 310652119957194d3c8cac91bf6bf171b647a103 (patch) | |
tree | 045878b164e986f2deae3bda3982473d9d03ce3c /src/apscheduler/schedulers/sync.py | |
parent | 191a9663c6fd2c65f7b524c59285dec5ac747ee7 (diff) | |
download | apscheduler-310652119957194d3c8cac91bf6bf171b647a103.tar.gz |
Refactored scheduler and worker classes to use attrs
Diffstat (limited to 'src/apscheduler/schedulers/sync.py')
-rw-r--r-- | src/apscheduler/schedulers/sync.py | 35 |
1 files changed, 21 insertions, 14 deletions
diff --git a/src/apscheduler/schedulers/sync.py b/src/apscheduler/schedulers/sync.py index 2af4f8e..c49359c 100644 --- a/src/apscheduler/schedulers/sync.py +++ b/src/apscheduler/schedulers/sync.py @@ -11,6 +11,8 @@ from logging import Logger, getLogger from typing import Any, Callable, Iterable, Mapping, Optional from uuid import UUID, uuid4 +import attr + from ..abc import DataStore, EventSource, Trigger from ..context import current_scheduler from ..datastores.memory import MemoryDataStore @@ -27,22 +29,24 @@ _microsecond_delta = timedelta(microseconds=1) _zero_timedelta = timedelta() +@attr.define(eq=False) class Scheduler: """A synchronous scheduler implementation.""" - _state: RunState = RunState.stopped - _wakeup_event: threading.Event - _worker: Optional[Worker] = None + data_store: DataStore = attr.field(factory=MemoryDataStore) + identity: str = attr.field(kw_only=True, default=None) + start_worker: bool = attr.field(kw_only=True, default=True) + logger: Optional[Logger] = attr.field(kw_only=True, default=getLogger(__name__)) - def __init__(self, data_store: Optional[DataStore] = None, *, identity: Optional[str] = None, - logger: Optional[Logger] = None, start_worker: bool = True): - self.identity = identity or f'{platform.node()}-{os.getpid()}-{id(self)}' - self.logger = logger or getLogger(__name__) - self.start_worker = start_worker - self.data_store = data_store or MemoryDataStore() - self._exit_stack = ExitStack() - self._executor = ThreadPoolExecutor(max_workers=1) - self._events = LocalEventBroker() + _state: RunState = attr.field(init=False, default=RunState.stopped) + _wakeup_event: threading.Event = attr.field(init=False) + _worker: Optional[Worker] = attr.field(init=False, default=None) + _events: LocalEventBroker = attr.field(init=False, factory=LocalEventBroker) + _exit_stack: ExitStack = attr.field(init=False) + + def __attrs_post_init__(self) -> None: + if not self.identity: + self.identity = f'{platform.node()}-{os.getpid()}-{id(self)}' @property def events(self) -> EventSource: @@ -59,6 +63,7 @@ class Scheduler: def __enter__(self) -> Scheduler: self._state = RunState.starting self._wakeup_event = threading.Event() + self._exit_stack = ExitStack() self._exit_stack.__enter__() self._exit_stack.enter_context(self._events) @@ -85,7 +90,9 @@ class Scheduler: # Start the scheduler and return when it has signalled readiness or raised an exception start_future: Future[Event] = Future() with self._events.subscribe(start_future.set_result, one_shot=True): - run_future = self._executor.submit(self.run) + executor = ThreadPoolExecutor(1) + self._exit_stack.push(lambda exc_type, *args: executor.shutdown(wait=exc_type is None)) + run_future = executor.submit(self.run) wait([start_future, run_future], return_when=FIRST_COMPLETED) if run_future.done(): @@ -96,8 +103,8 @@ class Scheduler: def __exit__(self, exc_type, exc_val, exc_tb): self._state = RunState.stopping self._wakeup_event.set() - self._executor.shutdown(wait=exc_type is None) self._exit_stack.__exit__(exc_type, exc_val, exc_tb) + self._state = RunState.stopped del self._wakeup_event def add_schedule( |