summaryrefslogtreecommitdiff
path: root/src/apscheduler/datastores/sqlalchemy.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/apscheduler/datastores/sqlalchemy.py')
-rw-r--r--src/apscheduler/datastores/sqlalchemy.py17
1 files changed, 6 insertions, 11 deletions
diff --git a/src/apscheduler/datastores/sqlalchemy.py b/src/apscheduler/datastores/sqlalchemy.py
index 01e31bb..9c7c905 100644
--- a/src/apscheduler/datastores/sqlalchemy.py
+++ b/src/apscheduler/datastores/sqlalchemy.py
@@ -37,7 +37,6 @@ from .._events import (
JobAcquired,
JobAdded,
JobDeserializationFailed,
- JobReleased,
ScheduleAdded,
ScheduleDeserializationFailed,
ScheduleRemoved,
@@ -178,6 +177,7 @@ class _BaseSQLAlchemyDataStore:
Column("scheduled_fire_time", timestamp_type),
Column("jitter", interval_type),
Column("start_deadline", timestamp_type),
+ Column("result_expiration_time", interval_type),
Column("tags", tags_type, nullable=False),
Column("created_at", timestamp_type, nullable=False),
Column("started_at", timestamp_type),
@@ -190,6 +190,7 @@ class _BaseSQLAlchemyDataStore:
Column("job_id", job_id_type, primary_key=True),
Column("outcome", Enum(JobOutcome), nullable=False),
Column("finished_at", timestamp_type, index=True),
+ Column("expires_at", timestamp_type, nullable=False, index=True),
Column("exception", LargeBinary),
Column("return_value", LargeBinary),
)
@@ -672,9 +673,10 @@ class SQLAlchemyDataStore(_BaseSQLAlchemyDataStore, BaseDataStore):
for attempt in self._retry():
with attempt, self.engine.begin() as conn:
# Insert the job result
- marshalled = result.marshal(self.serializer)
- insert = self.t_job_results.insert().values(**marshalled)
- conn.execute(insert)
+ if result.expires_at > result.finished_at:
+ marshalled = result.marshal(self.serializer)
+ insert = self.t_job_results.insert().values(**marshalled)
+ conn.execute(insert)
# Decrement the running jobs counter
update = (
@@ -688,13 +690,6 @@ class SQLAlchemyDataStore(_BaseSQLAlchemyDataStore, BaseDataStore):
delete = self.t_jobs.delete().where(self.t_jobs.c.id == result.job_id)
conn.execute(delete)
- # Publish the event
- self._events.publish(
- JobReleased(
- job_id=result.job_id, worker_id=worker_id, outcome=result.outcome
- )
- )
-
def get_job_result(self, job_id: UUID) -> JobResult | None:
for attempt in self._retry():
with attempt, self.engine.begin() as conn: