summaryrefslogtreecommitdiff
path: root/src/apscheduler/schedulers/sync.py
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2021-09-21 18:30:31 +0300
committerAlex Grönholm <alex.gronholm@nextday.fi>2021-09-22 23:55:40 +0300
commit310652119957194d3c8cac91bf6bf171b647a103 (patch)
tree045878b164e986f2deae3bda3982473d9d03ce3c /src/apscheduler/schedulers/sync.py
parent191a9663c6fd2c65f7b524c59285dec5ac747ee7 (diff)
downloadapscheduler-310652119957194d3c8cac91bf6bf171b647a103.tar.gz
Refactored scheduler and worker classes to use attrs
Diffstat (limited to 'src/apscheduler/schedulers/sync.py')
-rw-r--r--src/apscheduler/schedulers/sync.py35
1 files changed, 21 insertions, 14 deletions
diff --git a/src/apscheduler/schedulers/sync.py b/src/apscheduler/schedulers/sync.py
index 2af4f8e..c49359c 100644
--- a/src/apscheduler/schedulers/sync.py
+++ b/src/apscheduler/schedulers/sync.py
@@ -11,6 +11,8 @@ from logging import Logger, getLogger
from typing import Any, Callable, Iterable, Mapping, Optional
from uuid import UUID, uuid4
+import attr
+
from ..abc import DataStore, EventSource, Trigger
from ..context import current_scheduler
from ..datastores.memory import MemoryDataStore
@@ -27,22 +29,24 @@ _microsecond_delta = timedelta(microseconds=1)
_zero_timedelta = timedelta()
+@attr.define(eq=False)
class Scheduler:
"""A synchronous scheduler implementation."""
- _state: RunState = RunState.stopped
- _wakeup_event: threading.Event
- _worker: Optional[Worker] = None
+ data_store: DataStore = attr.field(factory=MemoryDataStore)
+ identity: str = attr.field(kw_only=True, default=None)
+ start_worker: bool = attr.field(kw_only=True, default=True)
+ logger: Optional[Logger] = attr.field(kw_only=True, default=getLogger(__name__))
- def __init__(self, data_store: Optional[DataStore] = None, *, identity: Optional[str] = None,
- logger: Optional[Logger] = None, start_worker: bool = True):
- self.identity = identity or f'{platform.node()}-{os.getpid()}-{id(self)}'
- self.logger = logger or getLogger(__name__)
- self.start_worker = start_worker
- self.data_store = data_store or MemoryDataStore()
- self._exit_stack = ExitStack()
- self._executor = ThreadPoolExecutor(max_workers=1)
- self._events = LocalEventBroker()
+ _state: RunState = attr.field(init=False, default=RunState.stopped)
+ _wakeup_event: threading.Event = attr.field(init=False)
+ _worker: Optional[Worker] = attr.field(init=False, default=None)
+ _events: LocalEventBroker = attr.field(init=False, factory=LocalEventBroker)
+ _exit_stack: ExitStack = attr.field(init=False)
+
+ def __attrs_post_init__(self) -> None:
+ if not self.identity:
+ self.identity = f'{platform.node()}-{os.getpid()}-{id(self)}'
@property
def events(self) -> EventSource:
@@ -59,6 +63,7 @@ class Scheduler:
def __enter__(self) -> Scheduler:
self._state = RunState.starting
self._wakeup_event = threading.Event()
+ self._exit_stack = ExitStack()
self._exit_stack.__enter__()
self._exit_stack.enter_context(self._events)
@@ -85,7 +90,9 @@ class Scheduler:
# Start the scheduler and return when it has signalled readiness or raised an exception
start_future: Future[Event] = Future()
with self._events.subscribe(start_future.set_result, one_shot=True):
- run_future = self._executor.submit(self.run)
+ executor = ThreadPoolExecutor(1)
+ self._exit_stack.push(lambda exc_type, *args: executor.shutdown(wait=exc_type is None))
+ run_future = executor.submit(self.run)
wait([start_future, run_future], return_when=FIRST_COMPLETED)
if run_future.done():
@@ -96,8 +103,8 @@ class Scheduler:
def __exit__(self, exc_type, exc_val, exc_tb):
self._state = RunState.stopping
self._wakeup_event.set()
- self._executor.shutdown(wait=exc_type is None)
self._exit_stack.__exit__(exc_type, exc_val, exc_tb)
+ self._state = RunState.stopped
del self._wakeup_event
def add_schedule(