summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2021-09-06 00:10:34 +0300
committerAlex Grönholm <alex.gronholm@nextday.fi>2021-09-06 01:39:07 +0300
commitda94af0571e6c40553b4463b5fba1d94f598b136 (patch)
tree9becae5076b7cfa75224c686a6efc1a2d364f301
parent148b29270eb8fa0974f29be4d85a0ee03b848d1a (diff)
downloadapscheduler-da94af0571e6c40553b4463b5fba1d94f598b136.tar.gz
Added missing TaskUpdated event
-rw-r--r--src/apscheduler/datastores/async_/sqlalchemy.py7
-rw-r--r--src/apscheduler/datastores/sync/memory.py8
-rw-r--r--src/apscheduler/datastores/sync/mongodb.py9
-rw-r--r--src/apscheduler/datastores/sync/sqlalchemy.py7
-rw-r--r--src/apscheduler/events.py5
-rw-r--r--tests/test_datastores.py9
6 files changed, 32 insertions, 13 deletions
diff --git a/src/apscheduler/datastores/async_/sqlalchemy.py b/src/apscheduler/datastores/async_/sqlalchemy.py
index 3e573b1..c3d1ff1 100644
--- a/src/apscheduler/datastores/async_/sqlalchemy.py
+++ b/src/apscheduler/datastores/async_/sqlalchemy.py
@@ -27,7 +27,7 @@ from ...enums import ConflictPolicy
from ...events import (
AsyncEventHub, DataStoreEvent, Event, JobAdded, JobDeserializationFailed, ScheduleAdded,
ScheduleDeserializationFailed, ScheduleRemoved, ScheduleUpdated, SubscriptionToken, TaskAdded,
- TaskRemoved)
+ TaskRemoved, TaskUpdated)
from ...exceptions import ConflictingIdError, SerializationError, TaskLookupError
from ...marshalling import callable_to_ref
from ...serializers.pickle import PickleSerializer
@@ -282,8 +282,9 @@ class SQLAlchemyDataStore(AsyncDataStore):
where(self.t_tasks.c.id == task.id)
async with self.engine.begin() as conn:
await conn.execute(update)
-
- self._events.publish(TaskAdded(task_id=task.id))
+ self._events.publish(TaskUpdated(task_id=task.id))
+ else:
+ self._events.publish(TaskAdded(task_id=task.id))
async def remove_task(self, task_id: str) -> None:
delete = self.t_tasks.delete().where(self.t_tasks.c.id == task_id)
diff --git a/src/apscheduler/datastores/sync/memory.py b/src/apscheduler/datastores/sync/memory.py
index 16c421b..c2a033e 100644
--- a/src/apscheduler/datastores/sync/memory.py
+++ b/src/apscheduler/datastores/sync/memory.py
@@ -14,7 +14,7 @@ from ...abc import DataStore, Job, Schedule
from ...enums import ConflictPolicy
from ...events import (
EventHub, JobAdded, ScheduleAdded, ScheduleRemoved, ScheduleUpdated, SubscriptionToken,
- TaskAdded, TaskRemoved)
+ TaskAdded, TaskRemoved, TaskUpdated)
from ...exceptions import ConflictingIdError, TaskLookupError
from ...structures import JobResult, Task
from ...util import reentrant
@@ -114,8 +114,12 @@ class MemoryDataStore(DataStore):
if ids is None or state.schedule.id in ids]
def add_task(self, task: Task) -> None:
+ task_exists = task.id in self._tasks
self._tasks[task.id] = TaskState(task)
- self._events.publish(TaskAdded(task_id=task.id))
+ if task_exists:
+ self._events.publish(TaskUpdated(task_id=task.id))
+ else:
+ self._events.publish(TaskAdded(task_id=task.id))
def remove_task(self, task_id: str) -> None:
try:
diff --git a/src/apscheduler/datastores/sync/mongodb.py b/src/apscheduler/datastores/sync/mongodb.py
index 2d25dbf..50a5d15 100644
--- a/src/apscheduler/datastores/sync/mongodb.py
+++ b/src/apscheduler/datastores/sync/mongodb.py
@@ -18,7 +18,7 @@ from ...abc import DataStore, Job, Schedule, Serializer
from ...enums import ConflictPolicy
from ...events import (
DataStoreEvent, EventHub, JobAdded, ScheduleAdded, ScheduleRemoved, ScheduleUpdated,
- SubscriptionToken, TaskAdded, TaskRemoved)
+ SubscriptionToken, TaskAdded, TaskRemoved, TaskUpdated)
from ...exceptions import (
ConflictingIdError, DeserializationError, SerializationError, TaskLookupError)
from ...serializers.pickle import PickleSerializer
@@ -93,14 +93,17 @@ class MongoDBDataStore(DataStore):
self._events.unsubscribe(token)
def add_task(self, task: Task) -> None:
- self._tasks.find_one_and_update(
+ previous = self._tasks.find_one_and_update(
{'_id': task.id},
{'$set': task.marshal(self.serializer),
'$setOnInsert': {'running_jobs': 0}},
upsert=True
)
self._local_tasks[task.id] = task
- self._events.publish(TaskAdded(task_id=task.id))
+ if previous:
+ self._events.publish(TaskUpdated(task_id=task.id))
+ else:
+ self._events.publish(TaskAdded(task_id=task.id))
def remove_task(self, task_id: str) -> None:
if not self._tasks.find_one_and_delete({'_id': task_id}):
diff --git a/src/apscheduler/datastores/sync/sqlalchemy.py b/src/apscheduler/datastores/sync/sqlalchemy.py
index 07a7628..2465b36 100644
--- a/src/apscheduler/datastores/sync/sqlalchemy.py
+++ b/src/apscheduler/datastores/sync/sqlalchemy.py
@@ -19,7 +19,7 @@ from ...enums import ConflictPolicy
from ...events import (
Event, EventHub, JobAdded, JobDeserializationFailed, ScheduleAdded,
ScheduleDeserializationFailed, ScheduleRemoved, ScheduleUpdated, SubscriptionToken, TaskAdded,
- TaskRemoved)
+ TaskRemoved, TaskUpdated)
from ...exceptions import ConflictingIdError, SerializationError, TaskLookupError
from ...marshalling import callable_to_ref
from ...serializers.pickle import PickleSerializer
@@ -195,8 +195,9 @@ class SQLAlchemyDataStore(DataStore):
where(self.t_tasks.c.id == task.id)
with self.engine.begin() as conn:
conn.execute(update)
-
- self._events.publish(TaskAdded(task_id=task.id))
+ self._events.publish(TaskUpdated(task_id=task.id))
+ else:
+ self._events.publish(TaskAdded(task_id=task.id))
def remove_task(self, task_id: str) -> None:
delete = self.t_tasks.delete().where(self.t_tasks.c.id == task_id)
diff --git a/src/apscheduler/events.py b/src/apscheduler/events.py
index 47799cb..d159102 100644
--- a/src/apscheduler/events.py
+++ b/src/apscheduler/events.py
@@ -50,6 +50,11 @@ class TaskAdded(DataStoreEvent):
@attr.define(kw_only=True, frozen=True)
+class TaskUpdated(DataStoreEvent):
+ task_id: str
+
+
+@attr.define(kw_only=True, frozen=True)
class TaskRemoved(DataStoreEvent):
task_id: str
diff --git a/tests/test_datastores.py b/tests/test_datastores.py
index b4342d5..5ce7bcd 100644
--- a/tests/test_datastores.py
+++ b/tests/test_datastores.py
@@ -10,7 +10,8 @@ from freezegun.api import FrozenDateTimeFactory
from apscheduler.abc import AsyncDataStore, Job, Schedule
from apscheduler.enums import CoalescePolicy, ConflictPolicy, JobOutcome
-from apscheduler.events import Event, ScheduleAdded, ScheduleRemoved, ScheduleUpdated, TaskAdded
+from apscheduler.events import (
+ Event, ScheduleAdded, ScheduleRemoved, ScheduleUpdated, TaskAdded, TaskUpdated)
from apscheduler.structures import JobResult, Task
from apscheduler.triggers.date import DateTrigger
@@ -56,7 +57,8 @@ class TestAsyncStores:
self, datastore_cm: AsyncContextManager[AsyncDataStore]) -> None:
import math
- async with datastore_cm as store, capture_events(store, 3, {TaskAdded}) as events:
+ event_types = {TaskAdded, TaskUpdated}
+ async with datastore_cm as store, capture_events(store, 3, event_types) as events:
await store.add_task(Task(id='test_task', func=print))
await store.add_task(Task(id='test_task2', func=math.ceil))
await store.add_task(Task(id='test_task', func=repr))
@@ -69,12 +71,15 @@ class TestAsyncStores:
assert tasks[1].func is math.ceil
received_event = events.pop(0)
+ assert isinstance(received_event, TaskAdded)
assert received_event.task_id == 'test_task'
received_event = events.pop(0)
+ assert isinstance(received_event, TaskAdded)
assert received_event.task_id == 'test_task2'
received_event = events.pop(0)
+ assert isinstance(received_event, TaskUpdated)
assert received_event.task_id == 'test_task'
assert not events