diff options
author | Alex Grönholm <alex.gronholm@nextday.fi> | 2021-09-26 12:37:03 +0300 |
---|---|---|
committer | Alex Grönholm <alex.gronholm@nextday.fi> | 2021-09-26 12:37:03 +0300 |
commit | 7a21d0e1281867e1cef3ff2a7ec2456562e0ee79 (patch) | |
tree | a23d17d0ff0ef4bd525d82e7a47a32a5c72a1af5 | |
parent | b421037421bde1d139e3844f6067ee3f1aeb6852 (diff) | |
download | apscheduler-7a21d0e1281867e1cef3ff2a7ec2456562e0ee79.tar.gz |
Lock schedules and jobs for update while acquiring
-rw-r--r-- | src/apscheduler/datastores/async_sqlalchemy.py | 3 | ||||
-rw-r--r-- | src/apscheduler/datastores/sqlalchemy.py | 3 |
2 files changed, 4 insertions, 2 deletions
diff --git a/src/apscheduler/datastores/async_sqlalchemy.py b/src/apscheduler/datastores/async_sqlalchemy.py index 989fa25..8f1632e 100644 --- a/src/apscheduler/datastores/async_sqlalchemy.py +++ b/src/apscheduler/datastores/async_sqlalchemy.py @@ -203,7 +203,7 @@ class AsyncSQLAlchemyDataStore(_BaseSQLAlchemyDataStore, AsyncDataStore): or_(self.t_schedules.c.acquired_until.is_(None), self.t_schedules.c.acquired_until < now))).\ order_by(self.t_schedules.c.next_fire_time).\ - limit(limit).cte() + limit(limit).with_for_update(skip_locked=True).cte() subselect = select([schedules_cte.c.id]) update = self.t_schedules.update().\ where(self.t_schedules.c.id.in_(subselect)).\ @@ -318,6 +318,7 @@ class AsyncSQLAlchemyDataStore(_BaseSQLAlchemyDataStore, AsyncDataStore): where(or_(self.t_jobs.c.acquired_until.is_(None), self.t_jobs.c.acquired_until < now)).\ order_by(self.t_jobs.c.created_at).\ + with_for_update(skip_locked=True).\ limit(limit) result = await conn.execute(query) diff --git a/src/apscheduler/datastores/sqlalchemy.py b/src/apscheduler/datastores/sqlalchemy.py index 4eb8ad1..225677e 100644 --- a/src/apscheduler/datastores/sqlalchemy.py +++ b/src/apscheduler/datastores/sqlalchemy.py @@ -336,7 +336,7 @@ class SQLAlchemyDataStore(_BaseSQLAlchemyDataStore, DataStore): or_(self.t_schedules.c.acquired_until.is_(None), self.t_schedules.c.acquired_until < now))).\ order_by(self.t_schedules.c.next_fire_time).\ - limit(limit).cte() + limit(limit).with_for_update(skip_locked=True).cte() subselect = select([schedules_cte.c.id]) update = self.t_schedules.update().\ where(self.t_schedules.c.id.in_(subselect)).\ @@ -451,6 +451,7 @@ class SQLAlchemyDataStore(_BaseSQLAlchemyDataStore, DataStore): where(or_(self.t_jobs.c.acquired_until.is_(None), self.t_jobs.c.acquired_until < now)).\ order_by(self.t_jobs.c.created_at).\ + with_for_update(skip_locked=True).\ limit(limit) result = conn.execute(query) |