diff options
author | Alex Grönholm <alex.gronholm@nextday.fi> | 2021-09-06 00:10:34 +0300 |
---|---|---|
committer | Alex Grönholm <alex.gronholm@nextday.fi> | 2021-09-06 01:39:07 +0300 |
commit | da94af0571e6c40553b4463b5fba1d94f598b136 (patch) | |
tree | 9becae5076b7cfa75224c686a6efc1a2d364f301 | |
parent | 148b29270eb8fa0974f29be4d85a0ee03b848d1a (diff) | |
download | apscheduler-da94af0571e6c40553b4463b5fba1d94f598b136.tar.gz |
Added missing TaskUpdated event
-rw-r--r-- | src/apscheduler/datastores/async_/sqlalchemy.py | 7 | ||||
-rw-r--r-- | src/apscheduler/datastores/sync/memory.py | 8 | ||||
-rw-r--r-- | src/apscheduler/datastores/sync/mongodb.py | 9 | ||||
-rw-r--r-- | src/apscheduler/datastores/sync/sqlalchemy.py | 7 | ||||
-rw-r--r-- | src/apscheduler/events.py | 5 | ||||
-rw-r--r-- | tests/test_datastores.py | 9 |
6 files changed, 32 insertions, 13 deletions
diff --git a/src/apscheduler/datastores/async_/sqlalchemy.py b/src/apscheduler/datastores/async_/sqlalchemy.py index 3e573b1..c3d1ff1 100644 --- a/src/apscheduler/datastores/async_/sqlalchemy.py +++ b/src/apscheduler/datastores/async_/sqlalchemy.py @@ -27,7 +27,7 @@ from ...enums import ConflictPolicy from ...events import ( AsyncEventHub, DataStoreEvent, Event, JobAdded, JobDeserializationFailed, ScheduleAdded, ScheduleDeserializationFailed, ScheduleRemoved, ScheduleUpdated, SubscriptionToken, TaskAdded, - TaskRemoved) + TaskRemoved, TaskUpdated) from ...exceptions import ConflictingIdError, SerializationError, TaskLookupError from ...marshalling import callable_to_ref from ...serializers.pickle import PickleSerializer @@ -282,8 +282,9 @@ class SQLAlchemyDataStore(AsyncDataStore): where(self.t_tasks.c.id == task.id) async with self.engine.begin() as conn: await conn.execute(update) - - self._events.publish(TaskAdded(task_id=task.id)) + self._events.publish(TaskUpdated(task_id=task.id)) + else: + self._events.publish(TaskAdded(task_id=task.id)) async def remove_task(self, task_id: str) -> None: delete = self.t_tasks.delete().where(self.t_tasks.c.id == task_id) diff --git a/src/apscheduler/datastores/sync/memory.py b/src/apscheduler/datastores/sync/memory.py index 16c421b..c2a033e 100644 --- a/src/apscheduler/datastores/sync/memory.py +++ b/src/apscheduler/datastores/sync/memory.py @@ -14,7 +14,7 @@ from ...abc import DataStore, Job, Schedule from ...enums import ConflictPolicy from ...events import ( EventHub, JobAdded, ScheduleAdded, ScheduleRemoved, ScheduleUpdated, SubscriptionToken, - TaskAdded, TaskRemoved) + TaskAdded, TaskRemoved, TaskUpdated) from ...exceptions import ConflictingIdError, TaskLookupError from ...structures import JobResult, Task from ...util import reentrant @@ -114,8 +114,12 @@ class MemoryDataStore(DataStore): if ids is None or state.schedule.id in ids] def add_task(self, task: Task) -> None: + task_exists = task.id in self._tasks self._tasks[task.id] = TaskState(task) - self._events.publish(TaskAdded(task_id=task.id)) + if task_exists: + self._events.publish(TaskUpdated(task_id=task.id)) + else: + self._events.publish(TaskAdded(task_id=task.id)) def remove_task(self, task_id: str) -> None: try: diff --git a/src/apscheduler/datastores/sync/mongodb.py b/src/apscheduler/datastores/sync/mongodb.py index 2d25dbf..50a5d15 100644 --- a/src/apscheduler/datastores/sync/mongodb.py +++ b/src/apscheduler/datastores/sync/mongodb.py @@ -18,7 +18,7 @@ from ...abc import DataStore, Job, Schedule, Serializer from ...enums import ConflictPolicy from ...events import ( DataStoreEvent, EventHub, JobAdded, ScheduleAdded, ScheduleRemoved, ScheduleUpdated, - SubscriptionToken, TaskAdded, TaskRemoved) + SubscriptionToken, TaskAdded, TaskRemoved, TaskUpdated) from ...exceptions import ( ConflictingIdError, DeserializationError, SerializationError, TaskLookupError) from ...serializers.pickle import PickleSerializer @@ -93,14 +93,17 @@ class MongoDBDataStore(DataStore): self._events.unsubscribe(token) def add_task(self, task: Task) -> None: - self._tasks.find_one_and_update( + previous = self._tasks.find_one_and_update( {'_id': task.id}, {'$set': task.marshal(self.serializer), '$setOnInsert': {'running_jobs': 0}}, upsert=True ) self._local_tasks[task.id] = task - self._events.publish(TaskAdded(task_id=task.id)) + if previous: + self._events.publish(TaskUpdated(task_id=task.id)) + else: + self._events.publish(TaskAdded(task_id=task.id)) def remove_task(self, task_id: str) -> None: if not self._tasks.find_one_and_delete({'_id': task_id}): diff --git a/src/apscheduler/datastores/sync/sqlalchemy.py b/src/apscheduler/datastores/sync/sqlalchemy.py index 07a7628..2465b36 100644 --- a/src/apscheduler/datastores/sync/sqlalchemy.py +++ b/src/apscheduler/datastores/sync/sqlalchemy.py @@ -19,7 +19,7 @@ from ...enums import ConflictPolicy from ...events import ( Event, EventHub, JobAdded, JobDeserializationFailed, ScheduleAdded, ScheduleDeserializationFailed, ScheduleRemoved, ScheduleUpdated, SubscriptionToken, TaskAdded, - TaskRemoved) + TaskRemoved, TaskUpdated) from ...exceptions import ConflictingIdError, SerializationError, TaskLookupError from ...marshalling import callable_to_ref from ...serializers.pickle import PickleSerializer @@ -195,8 +195,9 @@ class SQLAlchemyDataStore(DataStore): where(self.t_tasks.c.id == task.id) with self.engine.begin() as conn: conn.execute(update) - - self._events.publish(TaskAdded(task_id=task.id)) + self._events.publish(TaskUpdated(task_id=task.id)) + else: + self._events.publish(TaskAdded(task_id=task.id)) def remove_task(self, task_id: str) -> None: delete = self.t_tasks.delete().where(self.t_tasks.c.id == task_id) diff --git a/src/apscheduler/events.py b/src/apscheduler/events.py index 47799cb..d159102 100644 --- a/src/apscheduler/events.py +++ b/src/apscheduler/events.py @@ -50,6 +50,11 @@ class TaskAdded(DataStoreEvent): @attr.define(kw_only=True, frozen=True) +class TaskUpdated(DataStoreEvent): + task_id: str + + +@attr.define(kw_only=True, frozen=True) class TaskRemoved(DataStoreEvent): task_id: str diff --git a/tests/test_datastores.py b/tests/test_datastores.py index b4342d5..5ce7bcd 100644 --- a/tests/test_datastores.py +++ b/tests/test_datastores.py @@ -10,7 +10,8 @@ from freezegun.api import FrozenDateTimeFactory from apscheduler.abc import AsyncDataStore, Job, Schedule from apscheduler.enums import CoalescePolicy, ConflictPolicy, JobOutcome -from apscheduler.events import Event, ScheduleAdded, ScheduleRemoved, ScheduleUpdated, TaskAdded +from apscheduler.events import ( + Event, ScheduleAdded, ScheduleRemoved, ScheduleUpdated, TaskAdded, TaskUpdated) from apscheduler.structures import JobResult, Task from apscheduler.triggers.date import DateTrigger @@ -56,7 +57,8 @@ class TestAsyncStores: self, datastore_cm: AsyncContextManager[AsyncDataStore]) -> None: import math - async with datastore_cm as store, capture_events(store, 3, {TaskAdded}) as events: + event_types = {TaskAdded, TaskUpdated} + async with datastore_cm as store, capture_events(store, 3, event_types) as events: await store.add_task(Task(id='test_task', func=print)) await store.add_task(Task(id='test_task2', func=math.ceil)) await store.add_task(Task(id='test_task', func=repr)) @@ -69,12 +71,15 @@ class TestAsyncStores: assert tasks[1].func is math.ceil received_event = events.pop(0) + assert isinstance(received_event, TaskAdded) assert received_event.task_id == 'test_task' received_event = events.pop(0) + assert isinstance(received_event, TaskAdded) assert received_event.task_id == 'test_task2' received_event = events.pop(0) + assert isinstance(received_event, TaskUpdated) assert received_event.task_id == 'test_task' assert not events |