diff options
author | Alex Grönholm <alex.gronholm@nextday.fi> | 2023-02-01 00:03:08 +0200 |
---|---|---|
committer | Alex Grönholm <alex.gronholm@nextday.fi> | 2023-02-01 00:31:42 +0200 |
commit | 5516767613043b28bd31ac64931e8f5aa653af0b (patch) | |
tree | 9713217e91be758b3ca4f35f3ba0b88df41b379c | |
parent | a5e2cadb8a60ded5b57169e0c080782e976dc99b (diff) | |
download | apscheduler-5516767613043b28bd31ac64931e8f5aa653af0b.tar.gz |
Fixed compatibility with SQLAlchemy 2.0
This also bumps the minimum SQLAlchemy version to 1.4.
Fixes #707. Fixes #705.
-rw-r--r-- | apscheduler/jobstores/sqlalchemy.py | 67 | ||||
-rw-r--r-- | docs/versionhistory.rst | 6 | ||||
-rw-r--r-- | setup.py | 2 |
3 files changed, 44 insertions, 31 deletions
diff --git a/apscheduler/jobstores/sqlalchemy.py b/apscheduler/jobstores/sqlalchemy.py index dcfd3e5..716549b 100644 --- a/apscheduler/jobstores/sqlalchemy.py +++ b/apscheduler/jobstores/sqlalchemy.py @@ -57,7 +57,7 @@ class SQLAlchemyJobStore(BaseJobStore): # 25 = precision that translates to an 8-byte float self.jobs_t = Table( tablename, metadata, - Column('id', Unicode(191, _warn_on_bytestring=False), primary_key=True), + Column('id', Unicode(191), primary_key=True), Column('next_run_time', Float(25), index=True), Column('job_state', LargeBinary, nullable=False), schema=tableschema @@ -68,20 +68,22 @@ class SQLAlchemyJobStore(BaseJobStore): self.jobs_t.create(self.engine, True) def lookup_job(self, job_id): - selectable = select([self.jobs_t.c.job_state]).where(self.jobs_t.c.id == job_id) - job_state = self.engine.execute(selectable).scalar() - return self._reconstitute_job(job_state) if job_state else None + selectable = select(self.jobs_t.c.job_state).where(self.jobs_t.c.id == job_id) + with self.engine.begin() as connection: + job_state = connection.execute(selectable).scalar() + return self._reconstitute_job(job_state) if job_state else None def get_due_jobs(self, now): timestamp = datetime_to_utc_timestamp(now) return self._get_jobs(self.jobs_t.c.next_run_time <= timestamp) def get_next_run_time(self): - selectable = select([self.jobs_t.c.next_run_time]).\ + selectable = select(self.jobs_t.c.next_run_time).\ where(self.jobs_t.c.next_run_time != null()).\ order_by(self.jobs_t.c.next_run_time).limit(1) - next_run_time = self.engine.execute(selectable).scalar() - return utc_timestamp_to_datetime(next_run_time) + with self.engine.begin() as connection: + next_run_time = connection.execute(selectable).scalar() + return utc_timestamp_to_datetime(next_run_time) def get_all_jobs(self): jobs = self._get_jobs() @@ -94,29 +96,33 @@ class SQLAlchemyJobStore(BaseJobStore): 'next_run_time': datetime_to_utc_timestamp(job.next_run_time), 'job_state': pickle.dumps(job.__getstate__(), self.pickle_protocol) }) - try: - self.engine.execute(insert) - except IntegrityError: - raise ConflictingIdError(job.id) + with self.engine.begin() as connection: + try: + connection.execute(insert) + except IntegrityError: + raise ConflictingIdError(job.id) def update_job(self, job): update = self.jobs_t.update().values(**{ 'next_run_time': datetime_to_utc_timestamp(job.next_run_time), 'job_state': pickle.dumps(job.__getstate__(), self.pickle_protocol) }).where(self.jobs_t.c.id == job.id) - result = self.engine.execute(update) - if result.rowcount == 0: - raise JobLookupError(job.id) + with self.engine.begin() as connection: + result = connection.execute(update) + if result.rowcount == 0: + raise JobLookupError(job.id) def remove_job(self, job_id): delete = self.jobs_t.delete().where(self.jobs_t.c.id == job_id) - result = self.engine.execute(delete) - if result.rowcount == 0: - raise JobLookupError(job_id) + with self.engine.begin() as connection: + result = connection.execute(delete) + if result.rowcount == 0: + raise JobLookupError(job_id) def remove_all_jobs(self): delete = self.jobs_t.delete() - self.engine.execute(delete) + with self.engine.begin() as connection: + connection.execute(delete) def shutdown(self): self.engine.dispose() @@ -132,21 +138,22 @@ class SQLAlchemyJobStore(BaseJobStore): def _get_jobs(self, *conditions): jobs = [] - selectable = select([self.jobs_t.c.id, self.jobs_t.c.job_state]).\ + selectable = select(self.jobs_t.c.id, self.jobs_t.c.job_state).\ order_by(self.jobs_t.c.next_run_time) selectable = selectable.where(and_(*conditions)) if conditions else selectable failed_job_ids = set() - for row in self.engine.execute(selectable): - try: - jobs.append(self._reconstitute_job(row.job_state)) - except BaseException: - self._logger.exception('Unable to restore job "%s" -- removing it', row.id) - failed_job_ids.add(row.id) - - # Remove all the jobs we failed to restore - if failed_job_ids: - delete = self.jobs_t.delete().where(self.jobs_t.c.id.in_(failed_job_ids)) - self.engine.execute(delete) + with self.engine.begin() as connection: + for row in connection.execute(selectable): + try: + jobs.append(self._reconstitute_job(row.job_state)) + except BaseException: + self._logger.exception('Unable to restore job "%s" -- removing it', row.id) + failed_job_ids.add(row.id) + + # Remove all the jobs we failed to restore + if failed_job_ids: + delete = self.jobs_t.delete().where(self.jobs_t.c.id.in_(failed_job_ids)) + connection.execute(delete) return jobs diff --git a/docs/versionhistory.rst b/docs/versionhistory.rst index 55490fc..d09bbbd 100644 --- a/docs/versionhistory.rst +++ b/docs/versionhistory.rst @@ -4,6 +4,12 @@ Version history To find out how to migrate your application from a previous version of APScheduler, see the :doc:`migration section <migration>`. +3.9.2 +----- + +* Fixed compatibility with SQLAlchemy 2.0 and bumped minimum supported version to 1.4 + + 3.9.1 ----- @@ -54,7 +54,7 @@ setup( 'mongodb': ['pymongo >= 3.0'], 'redis': ['redis >= 3.0'], 'rethinkdb': ['rethinkdb >= 2.4.0'], - 'sqlalchemy': ['sqlalchemy >= 0.8'], + 'sqlalchemy': ['sqlalchemy >= 1.4'], 'tornado': ['tornado >= 4.3'], 'twisted': ['twisted'], 'zookeeper': ['kazoo'], |