summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2021-09-12 00:11:45 +0300
committerAlex Grönholm <alex.gronholm@nextday.fi>2021-09-12 00:11:45 +0300
commit59ea7376985ef2c8b8b6b6d6df6b1b3be958480c (patch)
tree8d3dbaf0bbe125d563ef43bdfead6a230507e29e
parent89fab3daa678a78c6db3f831d73d7e40e77f184f (diff)
downloadapscheduler-59ea7376985ef2c8b8b6b6d6df6b1b3be958480c.tar.gz
Reduced the number of job completion events
-rw-r--r--src/apscheduler/enums.py2
-rw-r--r--src/apscheduler/events.py64
-rw-r--r--src/apscheduler/structures.py2
-rw-r--r--src/apscheduler/workers/async_.py15
-rw-r--r--src/apscheduler/workers/sync.py12
-rw-r--r--tests/test_datastores.py4
-rw-r--r--tests/test_workers.py30
7 files changed, 42 insertions, 87 deletions
diff --git a/src/apscheduler/enums.py b/src/apscheduler/enums.py
index f9300de..5a0929c 100644
--- a/src/apscheduler/enums.py
+++ b/src/apscheduler/enums.py
@@ -10,7 +10,7 @@ class RunState(Enum):
class JobOutcome(Enum):
success = auto()
- failure = auto()
+ error = auto()
missed_start_deadline = auto()
cancelled = auto()
expired = auto()
diff --git a/src/apscheduler/events.py b/src/apscheduler/events.py
index 49703d9..cd63ea3 100644
--- a/src/apscheduler/events.py
+++ b/src/apscheduler/events.py
@@ -2,7 +2,6 @@ from __future__ import annotations
from datetime import datetime, timezone
from functools import partial
-from traceback import format_tb
from typing import Any, Callable, NewType, Optional
from uuid import UUID
@@ -10,6 +9,7 @@ import attr
from attr.converters import optional
from .converters import as_aware_datetime, as_uuid
+from .enums import JobOutcome
from .structures import Job
SubscriptionToken = NewType('SubscriptionToken', object)
@@ -136,7 +136,7 @@ class JobExecutionEvent(WorkerEvent):
@attr.define(kw_only=True, frozen=True)
class JobStarted(JobExecutionEvent):
- """Signals that a worker has started running a job."""
+ """Signals that a worker has started processing a job."""
@classmethod
def from_job(cls, job: Job, start_time: datetime) -> JobStarted:
@@ -147,62 +147,18 @@ class JobStarted(JobExecutionEvent):
@attr.define(kw_only=True, frozen=True)
-class JobDeadlineMissed(JobExecutionEvent):
- """Signals that a worker has skipped a job because its deadline was missed."""
+class JobEnded(JobExecutionEvent):
+ """Signals that a worker has finished processing of a job."""
+
+ outcome: JobOutcome
+ start_time: datetime = attr.field(converter=as_aware_datetime)
@classmethod
- def from_job(cls, job: Job, start_time: datetime) -> JobDeadlineMissed:
- return JobDeadlineMissed(
+ def from_job(cls, job: Job, outcome: JobOutcome, start_time: datetime) -> JobEnded:
+ return JobEnded(
timestamp=datetime.now(timezone.utc), job_id=job.id, task_id=job.task_id,
schedule_id=job.schedule_id, scheduled_fire_time=job.scheduled_fire_time,
- start_deadline=job.start_deadline)
-
-
-@attr.define(kw_only=True, frozen=True)
-class JobCompleted(JobExecutionEvent):
- """Signals that a worker has successfully run a job."""
- start_time: datetime = attr.field(converter=optional(as_aware_datetime))
- return_value: str
-
- @classmethod
- def from_retval(cls, job: Job, start_time: datetime, return_value: Any) -> JobCompleted:
- return JobCompleted(job_id=job.id, task_id=job.task_id, schedule_id=job.schedule_id,
- scheduled_fire_time=job.scheduled_fire_time, start_time=start_time,
- start_deadline=job.start_deadline, return_value=repr(return_value))
-
-
-@attr.define(kw_only=True, frozen=True)
-class JobCancelled(JobExecutionEvent):
- """Signals that a job was cancelled."""
- start_time: datetime = attr.field(converter=optional(as_aware_datetime))
-
- @classmethod
- def from_job(cls, job: Job, start_time: datetime) -> JobCancelled:
- return JobCancelled(job_id=job.id, task_id=job.task_id, schedule_id=job.schedule_id,
- scheduled_fire_time=job.scheduled_fire_time, start_time=start_time,
- start_deadline=job.start_deadline)
-
-
-@attr.define(kw_only=True, frozen=True)
-class JobFailed(JobExecutionEvent):
- """Signals that a worker encountered an exception while running a job."""
- start_time: datetime
- exc_type: str
- exc_val: str
- exc_tb: str
-
- @classmethod
- def from_exception(cls, job: Job, start_time: datetime, exception: BaseException) -> JobFailed:
- if exception.__class__.__module__ == 'builtins':
- exc_type = exception.__class__.__qualname__
- else:
- exc_type = f'{exception.__class__.__module__}.{exception.__class__.__qualname__}'
-
- return JobFailed(job_id=job.id, task_id=job.task_id, schedule_id=job.schedule_id,
- scheduled_fire_time=job.scheduled_fire_time, start_time=start_time,
- start_deadline=job.start_deadline, exc_type=exc_type,
- exc_val=str(exception),
- exc_tb='\n'.join(format_tb(exception.__traceback__)))
+ start_deadline=job.start_deadline, outcome=outcome, start_time=start_time)
#
diff --git a/src/apscheduler/structures.py b/src/apscheduler/structures.py
index bafb61c..4d1bc18 100644
--- a/src/apscheduler/structures.py
+++ b/src/apscheduler/structures.py
@@ -133,7 +133,7 @@ class JobResult:
def marshal(self, serializer: abc.Serializer) -> dict[str, Any]:
marshalled = attr.asdict(self)
marshalled['outcome'] = self.outcome.name
- if self.outcome is JobOutcome.failure:
+ if self.outcome is JobOutcome.error:
marshalled['exception'] = serializer.serialize(self.exception)
else:
del marshalled['exception']
diff --git a/src/apscheduler/workers/async_.py b/src/apscheduler/workers/async_.py
index 0893a98..7acef67 100644
--- a/src/apscheduler/workers/async_.py
+++ b/src/apscheduler/workers/async_.py
@@ -18,8 +18,7 @@ from ..datastores.async_adapter import AsyncDataStoreAdapter
from ..enums import JobOutcome, RunState
from ..eventbrokers.async_local import LocalAsyncEventBroker
from ..events import (
- Event, JobAdded, JobCancelled, JobCompleted, JobDeadlineMissed, JobFailed, JobStarted,
- SubscriptionToken, WorkerStarted, WorkerStopped)
+ Event, JobAdded, JobEnded, JobStarted, SubscriptionToken, WorkerStarted, WorkerStopped)
from ..structures import JobResult
@@ -129,7 +128,8 @@ class AsyncWorker(EventSource):
# Check if the job started before the deadline
start_time = datetime.now(timezone.utc)
if job.start_deadline is not None and start_time > job.start_deadline:
- await self._events.publish(JobDeadlineMissed.from_job(job, start_time))
+ await self._events.publish(
+ JobEnded.from_job(job, JobOutcome.missed_start_deadline, start_time))
return
await self._events.publish(JobStarted.from_job(job, start_time))
@@ -143,17 +143,18 @@ class AsyncWorker(EventSource):
await self.data_store.release_job(self.identity, job.task_id, result)
with move_on_after(1, shield=True):
- await self._events.publish(JobCancelled.from_job(job, start_time))
+ await self._events.publish(
+ JobEnded.from_job(job, JobOutcome.cancelled, start_time))
except BaseException as exc:
- result = JobResult(job_id=job.id, outcome=JobOutcome.failure, exception=exc)
+ result = JobResult(job_id=job.id, outcome=JobOutcome.error, exception=exc)
await self.data_store.release_job(self.identity, job.task_id, result)
- await self._events.publish(JobFailed.from_exception(job, start_time, exc))
+ await self._events.publish(JobEnded.from_job(job, JobOutcome.error, start_time))
if not isinstance(exc, Exception):
raise
else:
result = JobResult(job_id=job.id, outcome=JobOutcome.success, return_value=retval)
await self.data_store.release_job(self.identity, job.task_id, result)
- await self._events.publish(JobCompleted.from_retval(job, start_time, retval))
+ await self._events.publish(JobEnded.from_job(job, JobOutcome.success, start_time))
finally:
self._running_jobs.remove(job.id)
diff --git a/src/apscheduler/workers/sync.py b/src/apscheduler/workers/sync.py
index 30db6bf..45f0c5f 100644
--- a/src/apscheduler/workers/sync.py
+++ b/src/apscheduler/workers/sync.py
@@ -15,8 +15,7 @@ from ..abc import DataStore, EventSource
from ..enums import JobOutcome, RunState
from ..eventbrokers.local import LocalEventBroker
from ..events import (
- JobAdded, JobCompleted, JobDeadlineMissed, JobFailed, JobStarted, SubscriptionToken,
- WorkerStarted, WorkerStopped)
+ JobAdded, JobEnded, JobStarted, SubscriptionToken, WorkerStarted, WorkerStopped)
from ..structures import Job, JobResult
@@ -128,21 +127,22 @@ class Worker(EventSource):
# Check if the job started before the deadline
start_time = datetime.now(timezone.utc)
if job.start_deadline is not None and start_time > job.start_deadline:
- self._events.publish(JobDeadlineMissed.from_job(job, start_time))
+ self._events.publish(
+ JobEnded.from_job(job, JobOutcome.missed_start_deadline, start_time))
return
self._events.publish(JobStarted.from_job(job, start_time))
try:
retval = func(*job.args, **job.kwargs)
except BaseException as exc:
- result = JobResult(job_id=job.id, outcome=JobOutcome.failure, exception=exc)
+ result = JobResult(job_id=job.id, outcome=JobOutcome.error, exception=exc)
self.data_store.release_job(self.identity, job.task_id, result)
- self._events.publish(JobFailed.from_exception(job, start_time, exc))
+ self._events.publish(JobEnded.from_job(job, JobOutcome.error, start_time))
if not isinstance(exc, Exception):
raise
else:
result = JobResult(job_id=job.id, outcome=JobOutcome.success, return_value=retval)
self.data_store.release_job(self.identity, job.task_id, result)
- self._events.publish(JobCompleted.from_retval(job, start_time, retval))
+ self._events.publish(JobEnded.from_job(job, JobOutcome.success, start_time))
finally:
self._running_jobs.remove(job.id)
diff --git a/tests/test_datastores.py b/tests/test_datastores.py
index 4b01662..4389069 100644
--- a/tests/test_datastores.py
+++ b/tests/test_datastores.py
@@ -385,10 +385,10 @@ class TestAsyncStores:
await datastore.release_job(
'worker_id', acquired[0].task_id,
- JobResult(job_id=acquired[0].id, outcome=JobOutcome.failure,
+ JobResult(job_id=acquired[0].id, outcome=JobOutcome.error,
exception=ValueError('foo')))
result = await datastore.get_job_result(acquired[0].id)
- assert result.outcome is JobOutcome.failure
+ assert result.outcome is JobOutcome.error
assert isinstance(result.exception, ValueError)
assert result.exception.args == ('foo',)
assert result.return_value is None
diff --git a/tests/test_workers.py b/tests/test_workers.py
index d9127e7..1a98801 100644
--- a/tests/test_workers.py
+++ b/tests/test_workers.py
@@ -8,9 +8,9 @@ from anyio import fail_after
from apscheduler.abc import Job
from apscheduler.datastores.memory import MemoryDataStore
+from apscheduler.enums import JobOutcome
from apscheduler.events import (
- Event, JobAdded, JobCompleted, JobDeadlineMissed, JobFailed, JobStarted, TaskAdded,
- WorkerStarted, WorkerStopped)
+ Event, JobAdded, JobEnded, JobStarted, TaskAdded, WorkerStarted, WorkerStopped)
from apscheduler.structures import Task
from apscheduler.workers.async_ import AsyncWorker
from apscheduler.workers.sync import Worker
@@ -83,14 +83,12 @@ class TestAsyncWorker:
received_event = received_events.pop(0)
if fail:
# Then the job failed
- assert isinstance(received_event, JobFailed)
- assert isinstance(received_event.exc_type, str)
- assert isinstance(received_event.exc_val, str)
- assert isinstance(received_event.exc_tb, str)
+ assert isinstance(received_event, JobEnded)
+ assert received_event.outcome is JobOutcome.error
else:
# Then the job finished successfully
- assert isinstance(received_event, JobCompleted)
- assert received_event.return_value == "((1, 2), {'x': 'foo'})"
+ assert isinstance(received_event, JobEnded)
+ assert received_event.outcome is JobOutcome.success
# Finally, the worker was stopped
received_event = received_events.pop(0)
@@ -138,7 +136,8 @@ class TestAsyncWorker:
# Then the deadline was missed
received_event = received_events.pop(0)
- assert isinstance(received_event, JobDeadlineMissed)
+ assert isinstance(received_event, JobEnded)
+ assert received_event.outcome is JobOutcome.missed_start_deadline
assert received_event.job_id == job.id
assert received_event.task_id == 'task_id'
assert received_event.schedule_id == 'foo'
@@ -196,14 +195,12 @@ class TestSyncWorker:
received_event = received_events.pop(0)
if fail:
# Then the job failed
- assert isinstance(received_event, JobFailed)
- assert isinstance(received_event.exc_type, str)
- assert isinstance(received_event.exc_val, str)
- assert isinstance(received_event.exc_tb, str)
+ assert isinstance(received_event, JobEnded)
+ assert received_event.outcome is JobOutcome.error
else:
# Then the job finished successfully
- assert isinstance(received_event, JobCompleted)
- assert received_event.return_value == "((1, 2), {'x': 'foo'})"
+ assert isinstance(received_event, JobEnded)
+ assert received_event.outcome is JobOutcome.success
# Finally, the worker was stopped
received_event = received_events.pop(0)
@@ -250,7 +247,8 @@ class TestSyncWorker:
# Then the deadline was missed
received_event = received_events.pop(0)
- assert isinstance(received_event, JobDeadlineMissed)
+ assert isinstance(received_event, JobEnded)
+ assert received_event.outcome is JobOutcome.missed_start_deadline
assert received_event.job_id == job.id
assert received_event.task_id == 'task_id'
assert received_event.schedule_id == 'foo'