diff options
-rw-r--r-- | src/apscheduler/enums.py | 2 | ||||
-rw-r--r-- | src/apscheduler/events.py | 64 | ||||
-rw-r--r-- | src/apscheduler/structures.py | 2 | ||||
-rw-r--r-- | src/apscheduler/workers/async_.py | 15 | ||||
-rw-r--r-- | src/apscheduler/workers/sync.py | 12 | ||||
-rw-r--r-- | tests/test_datastores.py | 4 | ||||
-rw-r--r-- | tests/test_workers.py | 30 |
7 files changed, 42 insertions, 87 deletions
diff --git a/src/apscheduler/enums.py b/src/apscheduler/enums.py index f9300de..5a0929c 100644 --- a/src/apscheduler/enums.py +++ b/src/apscheduler/enums.py @@ -10,7 +10,7 @@ class RunState(Enum): class JobOutcome(Enum): success = auto() - failure = auto() + error = auto() missed_start_deadline = auto() cancelled = auto() expired = auto() diff --git a/src/apscheduler/events.py b/src/apscheduler/events.py index 49703d9..cd63ea3 100644 --- a/src/apscheduler/events.py +++ b/src/apscheduler/events.py @@ -2,7 +2,6 @@ from __future__ import annotations from datetime import datetime, timezone from functools import partial -from traceback import format_tb from typing import Any, Callable, NewType, Optional from uuid import UUID @@ -10,6 +9,7 @@ import attr from attr.converters import optional from .converters import as_aware_datetime, as_uuid +from .enums import JobOutcome from .structures import Job SubscriptionToken = NewType('SubscriptionToken', object) @@ -136,7 +136,7 @@ class JobExecutionEvent(WorkerEvent): @attr.define(kw_only=True, frozen=True) class JobStarted(JobExecutionEvent): - """Signals that a worker has started running a job.""" + """Signals that a worker has started processing a job.""" @classmethod def from_job(cls, job: Job, start_time: datetime) -> JobStarted: @@ -147,62 +147,18 @@ class JobStarted(JobExecutionEvent): @attr.define(kw_only=True, frozen=True) -class JobDeadlineMissed(JobExecutionEvent): - """Signals that a worker has skipped a job because its deadline was missed.""" +class JobEnded(JobExecutionEvent): + """Signals that a worker has finished processing of a job.""" + + outcome: JobOutcome + start_time: datetime = attr.field(converter=as_aware_datetime) @classmethod - def from_job(cls, job: Job, start_time: datetime) -> JobDeadlineMissed: - return JobDeadlineMissed( + def from_job(cls, job: Job, outcome: JobOutcome, start_time: datetime) -> JobEnded: + return JobEnded( timestamp=datetime.now(timezone.utc), job_id=job.id, task_id=job.task_id, schedule_id=job.schedule_id, scheduled_fire_time=job.scheduled_fire_time, - start_deadline=job.start_deadline) - - -@attr.define(kw_only=True, frozen=True) -class JobCompleted(JobExecutionEvent): - """Signals that a worker has successfully run a job.""" - start_time: datetime = attr.field(converter=optional(as_aware_datetime)) - return_value: str - - @classmethod - def from_retval(cls, job: Job, start_time: datetime, return_value: Any) -> JobCompleted: - return JobCompleted(job_id=job.id, task_id=job.task_id, schedule_id=job.schedule_id, - scheduled_fire_time=job.scheduled_fire_time, start_time=start_time, - start_deadline=job.start_deadline, return_value=repr(return_value)) - - -@attr.define(kw_only=True, frozen=True) -class JobCancelled(JobExecutionEvent): - """Signals that a job was cancelled.""" - start_time: datetime = attr.field(converter=optional(as_aware_datetime)) - - @classmethod - def from_job(cls, job: Job, start_time: datetime) -> JobCancelled: - return JobCancelled(job_id=job.id, task_id=job.task_id, schedule_id=job.schedule_id, - scheduled_fire_time=job.scheduled_fire_time, start_time=start_time, - start_deadline=job.start_deadline) - - -@attr.define(kw_only=True, frozen=True) -class JobFailed(JobExecutionEvent): - """Signals that a worker encountered an exception while running a job.""" - start_time: datetime - exc_type: str - exc_val: str - exc_tb: str - - @classmethod - def from_exception(cls, job: Job, start_time: datetime, exception: BaseException) -> JobFailed: - if exception.__class__.__module__ == 'builtins': - exc_type = exception.__class__.__qualname__ - else: - exc_type = f'{exception.__class__.__module__}.{exception.__class__.__qualname__}' - - return JobFailed(job_id=job.id, task_id=job.task_id, schedule_id=job.schedule_id, - scheduled_fire_time=job.scheduled_fire_time, start_time=start_time, - start_deadline=job.start_deadline, exc_type=exc_type, - exc_val=str(exception), - exc_tb='\n'.join(format_tb(exception.__traceback__))) + start_deadline=job.start_deadline, outcome=outcome, start_time=start_time) # diff --git a/src/apscheduler/structures.py b/src/apscheduler/structures.py index bafb61c..4d1bc18 100644 --- a/src/apscheduler/structures.py +++ b/src/apscheduler/structures.py @@ -133,7 +133,7 @@ class JobResult: def marshal(self, serializer: abc.Serializer) -> dict[str, Any]: marshalled = attr.asdict(self) marshalled['outcome'] = self.outcome.name - if self.outcome is JobOutcome.failure: + if self.outcome is JobOutcome.error: marshalled['exception'] = serializer.serialize(self.exception) else: del marshalled['exception'] diff --git a/src/apscheduler/workers/async_.py b/src/apscheduler/workers/async_.py index 0893a98..7acef67 100644 --- a/src/apscheduler/workers/async_.py +++ b/src/apscheduler/workers/async_.py @@ -18,8 +18,7 @@ from ..datastores.async_adapter import AsyncDataStoreAdapter from ..enums import JobOutcome, RunState from ..eventbrokers.async_local import LocalAsyncEventBroker from ..events import ( - Event, JobAdded, JobCancelled, JobCompleted, JobDeadlineMissed, JobFailed, JobStarted, - SubscriptionToken, WorkerStarted, WorkerStopped) + Event, JobAdded, JobEnded, JobStarted, SubscriptionToken, WorkerStarted, WorkerStopped) from ..structures import JobResult @@ -129,7 +128,8 @@ class AsyncWorker(EventSource): # Check if the job started before the deadline start_time = datetime.now(timezone.utc) if job.start_deadline is not None and start_time > job.start_deadline: - await self._events.publish(JobDeadlineMissed.from_job(job, start_time)) + await self._events.publish( + JobEnded.from_job(job, JobOutcome.missed_start_deadline, start_time)) return await self._events.publish(JobStarted.from_job(job, start_time)) @@ -143,17 +143,18 @@ class AsyncWorker(EventSource): await self.data_store.release_job(self.identity, job.task_id, result) with move_on_after(1, shield=True): - await self._events.publish(JobCancelled.from_job(job, start_time)) + await self._events.publish( + JobEnded.from_job(job, JobOutcome.cancelled, start_time)) except BaseException as exc: - result = JobResult(job_id=job.id, outcome=JobOutcome.failure, exception=exc) + result = JobResult(job_id=job.id, outcome=JobOutcome.error, exception=exc) await self.data_store.release_job(self.identity, job.task_id, result) - await self._events.publish(JobFailed.from_exception(job, start_time, exc)) + await self._events.publish(JobEnded.from_job(job, JobOutcome.error, start_time)) if not isinstance(exc, Exception): raise else: result = JobResult(job_id=job.id, outcome=JobOutcome.success, return_value=retval) await self.data_store.release_job(self.identity, job.task_id, result) - await self._events.publish(JobCompleted.from_retval(job, start_time, retval)) + await self._events.publish(JobEnded.from_job(job, JobOutcome.success, start_time)) finally: self._running_jobs.remove(job.id) diff --git a/src/apscheduler/workers/sync.py b/src/apscheduler/workers/sync.py index 30db6bf..45f0c5f 100644 --- a/src/apscheduler/workers/sync.py +++ b/src/apscheduler/workers/sync.py @@ -15,8 +15,7 @@ from ..abc import DataStore, EventSource from ..enums import JobOutcome, RunState from ..eventbrokers.local import LocalEventBroker from ..events import ( - JobAdded, JobCompleted, JobDeadlineMissed, JobFailed, JobStarted, SubscriptionToken, - WorkerStarted, WorkerStopped) + JobAdded, JobEnded, JobStarted, SubscriptionToken, WorkerStarted, WorkerStopped) from ..structures import Job, JobResult @@ -128,21 +127,22 @@ class Worker(EventSource): # Check if the job started before the deadline start_time = datetime.now(timezone.utc) if job.start_deadline is not None and start_time > job.start_deadline: - self._events.publish(JobDeadlineMissed.from_job(job, start_time)) + self._events.publish( + JobEnded.from_job(job, JobOutcome.missed_start_deadline, start_time)) return self._events.publish(JobStarted.from_job(job, start_time)) try: retval = func(*job.args, **job.kwargs) except BaseException as exc: - result = JobResult(job_id=job.id, outcome=JobOutcome.failure, exception=exc) + result = JobResult(job_id=job.id, outcome=JobOutcome.error, exception=exc) self.data_store.release_job(self.identity, job.task_id, result) - self._events.publish(JobFailed.from_exception(job, start_time, exc)) + self._events.publish(JobEnded.from_job(job, JobOutcome.error, start_time)) if not isinstance(exc, Exception): raise else: result = JobResult(job_id=job.id, outcome=JobOutcome.success, return_value=retval) self.data_store.release_job(self.identity, job.task_id, result) - self._events.publish(JobCompleted.from_retval(job, start_time, retval)) + self._events.publish(JobEnded.from_job(job, JobOutcome.success, start_time)) finally: self._running_jobs.remove(job.id) diff --git a/tests/test_datastores.py b/tests/test_datastores.py index 4b01662..4389069 100644 --- a/tests/test_datastores.py +++ b/tests/test_datastores.py @@ -385,10 +385,10 @@ class TestAsyncStores: await datastore.release_job( 'worker_id', acquired[0].task_id, - JobResult(job_id=acquired[0].id, outcome=JobOutcome.failure, + JobResult(job_id=acquired[0].id, outcome=JobOutcome.error, exception=ValueError('foo'))) result = await datastore.get_job_result(acquired[0].id) - assert result.outcome is JobOutcome.failure + assert result.outcome is JobOutcome.error assert isinstance(result.exception, ValueError) assert result.exception.args == ('foo',) assert result.return_value is None diff --git a/tests/test_workers.py b/tests/test_workers.py index d9127e7..1a98801 100644 --- a/tests/test_workers.py +++ b/tests/test_workers.py @@ -8,9 +8,9 @@ from anyio import fail_after from apscheduler.abc import Job from apscheduler.datastores.memory import MemoryDataStore +from apscheduler.enums import JobOutcome from apscheduler.events import ( - Event, JobAdded, JobCompleted, JobDeadlineMissed, JobFailed, JobStarted, TaskAdded, - WorkerStarted, WorkerStopped) + Event, JobAdded, JobEnded, JobStarted, TaskAdded, WorkerStarted, WorkerStopped) from apscheduler.structures import Task from apscheduler.workers.async_ import AsyncWorker from apscheduler.workers.sync import Worker @@ -83,14 +83,12 @@ class TestAsyncWorker: received_event = received_events.pop(0) if fail: # Then the job failed - assert isinstance(received_event, JobFailed) - assert isinstance(received_event.exc_type, str) - assert isinstance(received_event.exc_val, str) - assert isinstance(received_event.exc_tb, str) + assert isinstance(received_event, JobEnded) + assert received_event.outcome is JobOutcome.error else: # Then the job finished successfully - assert isinstance(received_event, JobCompleted) - assert received_event.return_value == "((1, 2), {'x': 'foo'})" + assert isinstance(received_event, JobEnded) + assert received_event.outcome is JobOutcome.success # Finally, the worker was stopped received_event = received_events.pop(0) @@ -138,7 +136,8 @@ class TestAsyncWorker: # Then the deadline was missed received_event = received_events.pop(0) - assert isinstance(received_event, JobDeadlineMissed) + assert isinstance(received_event, JobEnded) + assert received_event.outcome is JobOutcome.missed_start_deadline assert received_event.job_id == job.id assert received_event.task_id == 'task_id' assert received_event.schedule_id == 'foo' @@ -196,14 +195,12 @@ class TestSyncWorker: received_event = received_events.pop(0) if fail: # Then the job failed - assert isinstance(received_event, JobFailed) - assert isinstance(received_event.exc_type, str) - assert isinstance(received_event.exc_val, str) - assert isinstance(received_event.exc_tb, str) + assert isinstance(received_event, JobEnded) + assert received_event.outcome is JobOutcome.error else: # Then the job finished successfully - assert isinstance(received_event, JobCompleted) - assert received_event.return_value == "((1, 2), {'x': 'foo'})" + assert isinstance(received_event, JobEnded) + assert received_event.outcome is JobOutcome.success # Finally, the worker was stopped received_event = received_events.pop(0) @@ -250,7 +247,8 @@ class TestSyncWorker: # Then the deadline was missed received_event = received_events.pop(0) - assert isinstance(received_event, JobDeadlineMissed) + assert isinstance(received_event, JobEnded) + assert received_event.outcome is JobOutcome.missed_start_deadline assert received_event.job_id == job.id assert received_event.task_id == 'task_id' assert received_event.schedule_id == 'foo' |