summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2021-09-26 12:37:03 +0300
committerAlex Grönholm <alex.gronholm@nextday.fi>2021-09-26 12:37:03 +0300
commit7a21d0e1281867e1cef3ff2a7ec2456562e0ee79 (patch)
treea23d17d0ff0ef4bd525d82e7a47a32a5c72a1af5
parentb421037421bde1d139e3844f6067ee3f1aeb6852 (diff)
downloadapscheduler-7a21d0e1281867e1cef3ff2a7ec2456562e0ee79.tar.gz
Lock schedules and jobs for update while acquiring
-rw-r--r--src/apscheduler/datastores/async_sqlalchemy.py3
-rw-r--r--src/apscheduler/datastores/sqlalchemy.py3
2 files changed, 4 insertions, 2 deletions
diff --git a/src/apscheduler/datastores/async_sqlalchemy.py b/src/apscheduler/datastores/async_sqlalchemy.py
index 989fa25..8f1632e 100644
--- a/src/apscheduler/datastores/async_sqlalchemy.py
+++ b/src/apscheduler/datastores/async_sqlalchemy.py
@@ -203,7 +203,7 @@ class AsyncSQLAlchemyDataStore(_BaseSQLAlchemyDataStore, AsyncDataStore):
or_(self.t_schedules.c.acquired_until.is_(None),
self.t_schedules.c.acquired_until < now))).\
order_by(self.t_schedules.c.next_fire_time).\
- limit(limit).cte()
+ limit(limit).with_for_update(skip_locked=True).cte()
subselect = select([schedules_cte.c.id])
update = self.t_schedules.update().\
where(self.t_schedules.c.id.in_(subselect)).\
@@ -318,6 +318,7 @@ class AsyncSQLAlchemyDataStore(_BaseSQLAlchemyDataStore, AsyncDataStore):
where(or_(self.t_jobs.c.acquired_until.is_(None),
self.t_jobs.c.acquired_until < now)).\
order_by(self.t_jobs.c.created_at).\
+ with_for_update(skip_locked=True).\
limit(limit)
result = await conn.execute(query)
diff --git a/src/apscheduler/datastores/sqlalchemy.py b/src/apscheduler/datastores/sqlalchemy.py
index 4eb8ad1..225677e 100644
--- a/src/apscheduler/datastores/sqlalchemy.py
+++ b/src/apscheduler/datastores/sqlalchemy.py
@@ -336,7 +336,7 @@ class SQLAlchemyDataStore(_BaseSQLAlchemyDataStore, DataStore):
or_(self.t_schedules.c.acquired_until.is_(None),
self.t_schedules.c.acquired_until < now))).\
order_by(self.t_schedules.c.next_fire_time).\
- limit(limit).cte()
+ limit(limit).with_for_update(skip_locked=True).cte()
subselect = select([schedules_cte.c.id])
update = self.t_schedules.update().\
where(self.t_schedules.c.id.in_(subselect)).\
@@ -451,6 +451,7 @@ class SQLAlchemyDataStore(_BaseSQLAlchemyDataStore, DataStore):
where(or_(self.t_jobs.c.acquired_until.is_(None),
self.t_jobs.c.acquired_until < now)).\
order_by(self.t_jobs.c.created_at).\
+ with_for_update(skip_locked=True).\
limit(limit)
result = conn.execute(query)