diff options
Diffstat (limited to 'src/apscheduler/datastores/sqlalchemy.py')
-rw-r--r-- | src/apscheduler/datastores/sqlalchemy.py | 17 |
1 files changed, 6 insertions, 11 deletions
diff --git a/src/apscheduler/datastores/sqlalchemy.py b/src/apscheduler/datastores/sqlalchemy.py index 01e31bb..9c7c905 100644 --- a/src/apscheduler/datastores/sqlalchemy.py +++ b/src/apscheduler/datastores/sqlalchemy.py @@ -37,7 +37,6 @@ from .._events import ( JobAcquired, JobAdded, JobDeserializationFailed, - JobReleased, ScheduleAdded, ScheduleDeserializationFailed, ScheduleRemoved, @@ -178,6 +177,7 @@ class _BaseSQLAlchemyDataStore: Column("scheduled_fire_time", timestamp_type), Column("jitter", interval_type), Column("start_deadline", timestamp_type), + Column("result_expiration_time", interval_type), Column("tags", tags_type, nullable=False), Column("created_at", timestamp_type, nullable=False), Column("started_at", timestamp_type), @@ -190,6 +190,7 @@ class _BaseSQLAlchemyDataStore: Column("job_id", job_id_type, primary_key=True), Column("outcome", Enum(JobOutcome), nullable=False), Column("finished_at", timestamp_type, index=True), + Column("expires_at", timestamp_type, nullable=False, index=True), Column("exception", LargeBinary), Column("return_value", LargeBinary), ) @@ -672,9 +673,10 @@ class SQLAlchemyDataStore(_BaseSQLAlchemyDataStore, BaseDataStore): for attempt in self._retry(): with attempt, self.engine.begin() as conn: # Insert the job result - marshalled = result.marshal(self.serializer) - insert = self.t_job_results.insert().values(**marshalled) - conn.execute(insert) + if result.expires_at > result.finished_at: + marshalled = result.marshal(self.serializer) + insert = self.t_job_results.insert().values(**marshalled) + conn.execute(insert) # Decrement the running jobs counter update = ( @@ -688,13 +690,6 @@ class SQLAlchemyDataStore(_BaseSQLAlchemyDataStore, BaseDataStore): delete = self.t_jobs.delete().where(self.t_jobs.c.id == result.job_id) conn.execute(delete) - # Publish the event - self._events.publish( - JobReleased( - job_id=result.job_id, worker_id=worker_id, outcome=result.outcome - ) - ) - def get_job_result(self, job_id: UUID) -> JobResult | None: for attempt in self._retry(): with attempt, self.engine.begin() as conn: |