summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2023-02-01 00:03:08 +0200
committerAlex Grönholm <alex.gronholm@nextday.fi>2023-02-01 00:31:42 +0200
commit5516767613043b28bd31ac64931e8f5aa653af0b (patch)
tree9713217e91be758b3ca4f35f3ba0b88df41b379c
parenta5e2cadb8a60ded5b57169e0c080782e976dc99b (diff)
downloadapscheduler-5516767613043b28bd31ac64931e8f5aa653af0b.tar.gz
Fixed compatibility with SQLAlchemy 2.0
This also bumps the minimum SQLAlchemy version to 1.4. Fixes #707. Fixes #705.
-rw-r--r--apscheduler/jobstores/sqlalchemy.py67
-rw-r--r--docs/versionhistory.rst6
-rw-r--r--setup.py2
3 files changed, 44 insertions, 31 deletions
diff --git a/apscheduler/jobstores/sqlalchemy.py b/apscheduler/jobstores/sqlalchemy.py
index dcfd3e5..716549b 100644
--- a/apscheduler/jobstores/sqlalchemy.py
+++ b/apscheduler/jobstores/sqlalchemy.py
@@ -57,7 +57,7 @@ class SQLAlchemyJobStore(BaseJobStore):
# 25 = precision that translates to an 8-byte float
self.jobs_t = Table(
tablename, metadata,
- Column('id', Unicode(191, _warn_on_bytestring=False), primary_key=True),
+ Column('id', Unicode(191), primary_key=True),
Column('next_run_time', Float(25), index=True),
Column('job_state', LargeBinary, nullable=False),
schema=tableschema
@@ -68,20 +68,22 @@ class SQLAlchemyJobStore(BaseJobStore):
self.jobs_t.create(self.engine, True)
def lookup_job(self, job_id):
- selectable = select([self.jobs_t.c.job_state]).where(self.jobs_t.c.id == job_id)
- job_state = self.engine.execute(selectable).scalar()
- return self._reconstitute_job(job_state) if job_state else None
+ selectable = select(self.jobs_t.c.job_state).where(self.jobs_t.c.id == job_id)
+ with self.engine.begin() as connection:
+ job_state = connection.execute(selectable).scalar()
+ return self._reconstitute_job(job_state) if job_state else None
def get_due_jobs(self, now):
timestamp = datetime_to_utc_timestamp(now)
return self._get_jobs(self.jobs_t.c.next_run_time <= timestamp)
def get_next_run_time(self):
- selectable = select([self.jobs_t.c.next_run_time]).\
+ selectable = select(self.jobs_t.c.next_run_time).\
where(self.jobs_t.c.next_run_time != null()).\
order_by(self.jobs_t.c.next_run_time).limit(1)
- next_run_time = self.engine.execute(selectable).scalar()
- return utc_timestamp_to_datetime(next_run_time)
+ with self.engine.begin() as connection:
+ next_run_time = connection.execute(selectable).scalar()
+ return utc_timestamp_to_datetime(next_run_time)
def get_all_jobs(self):
jobs = self._get_jobs()
@@ -94,29 +96,33 @@ class SQLAlchemyJobStore(BaseJobStore):
'next_run_time': datetime_to_utc_timestamp(job.next_run_time),
'job_state': pickle.dumps(job.__getstate__(), self.pickle_protocol)
})
- try:
- self.engine.execute(insert)
- except IntegrityError:
- raise ConflictingIdError(job.id)
+ with self.engine.begin() as connection:
+ try:
+ connection.execute(insert)
+ except IntegrityError:
+ raise ConflictingIdError(job.id)
def update_job(self, job):
update = self.jobs_t.update().values(**{
'next_run_time': datetime_to_utc_timestamp(job.next_run_time),
'job_state': pickle.dumps(job.__getstate__(), self.pickle_protocol)
}).where(self.jobs_t.c.id == job.id)
- result = self.engine.execute(update)
- if result.rowcount == 0:
- raise JobLookupError(job.id)
+ with self.engine.begin() as connection:
+ result = connection.execute(update)
+ if result.rowcount == 0:
+ raise JobLookupError(job.id)
def remove_job(self, job_id):
delete = self.jobs_t.delete().where(self.jobs_t.c.id == job_id)
- result = self.engine.execute(delete)
- if result.rowcount == 0:
- raise JobLookupError(job_id)
+ with self.engine.begin() as connection:
+ result = connection.execute(delete)
+ if result.rowcount == 0:
+ raise JobLookupError(job_id)
def remove_all_jobs(self):
delete = self.jobs_t.delete()
- self.engine.execute(delete)
+ with self.engine.begin() as connection:
+ connection.execute(delete)
def shutdown(self):
self.engine.dispose()
@@ -132,21 +138,22 @@ class SQLAlchemyJobStore(BaseJobStore):
def _get_jobs(self, *conditions):
jobs = []
- selectable = select([self.jobs_t.c.id, self.jobs_t.c.job_state]).\
+ selectable = select(self.jobs_t.c.id, self.jobs_t.c.job_state).\
order_by(self.jobs_t.c.next_run_time)
selectable = selectable.where(and_(*conditions)) if conditions else selectable
failed_job_ids = set()
- for row in self.engine.execute(selectable):
- try:
- jobs.append(self._reconstitute_job(row.job_state))
- except BaseException:
- self._logger.exception('Unable to restore job "%s" -- removing it', row.id)
- failed_job_ids.add(row.id)
-
- # Remove all the jobs we failed to restore
- if failed_job_ids:
- delete = self.jobs_t.delete().where(self.jobs_t.c.id.in_(failed_job_ids))
- self.engine.execute(delete)
+ with self.engine.begin() as connection:
+ for row in connection.execute(selectable):
+ try:
+ jobs.append(self._reconstitute_job(row.job_state))
+ except BaseException:
+ self._logger.exception('Unable to restore job "%s" -- removing it', row.id)
+ failed_job_ids.add(row.id)
+
+ # Remove all the jobs we failed to restore
+ if failed_job_ids:
+ delete = self.jobs_t.delete().where(self.jobs_t.c.id.in_(failed_job_ids))
+ connection.execute(delete)
return jobs
diff --git a/docs/versionhistory.rst b/docs/versionhistory.rst
index 55490fc..d09bbbd 100644
--- a/docs/versionhistory.rst
+++ b/docs/versionhistory.rst
@@ -4,6 +4,12 @@ Version history
To find out how to migrate your application from a previous version of
APScheduler, see the :doc:`migration section <migration>`.
+3.9.2
+-----
+
+* Fixed compatibility with SQLAlchemy 2.0 and bumped minimum supported version to 1.4
+
+
3.9.1
-----
diff --git a/setup.py b/setup.py
index ab9fe8c..90933d6 100644
--- a/setup.py
+++ b/setup.py
@@ -54,7 +54,7 @@ setup(
'mongodb': ['pymongo >= 3.0'],
'redis': ['redis >= 3.0'],
'rethinkdb': ['rethinkdb >= 2.4.0'],
- 'sqlalchemy': ['sqlalchemy >= 0.8'],
+ 'sqlalchemy': ['sqlalchemy >= 1.4'],
'tornado': ['tornado >= 4.3'],
'twisted': ['twisted'],
'zookeeper': ['kazoo'],