diff options
author | Alex Grönholm <alex.gronholm@nextday.fi> | 2014-02-01 17:52:47 +0200 |
---|---|---|
committer | Alex Grönholm <alex.gronholm@nextday.fi> | 2014-03-09 10:13:55 +0200 |
commit | edbc940a9d9206b0a81d4067484e8a8cadaa035b (patch) | |
tree | 50dcfca1d6b5157cf39cff8b9230f1b93385cd2e /apscheduler/jobstores/sqlalchemy.py | |
parent | 17aae6293ce5f090477d48e15d25cf5b34a906ad (diff) | |
download | apscheduler-edbc940a9d9206b0a81d4067484e8a8cadaa035b.tar.gz |
Overhauled the job store system and ditched ShelveJobStore and RedisJobStore in the process
Diffstat (limited to 'apscheduler/jobstores/sqlalchemy.py')
-rw-r--r-- | apscheduler/jobstores/sqlalchemy.py | 142 |
1 files changed, 97 insertions, 45 deletions
diff --git a/apscheduler/jobstores/sqlalchemy.py b/apscheduler/jobstores/sqlalchemy.py index 5ce368c..a4934bc 100644 --- a/apscheduler/jobstores/sqlalchemy.py +++ b/apscheduler/jobstores/sqlalchemy.py @@ -5,85 +5,137 @@ from __future__ import absolute_import import pickle import logging -import sqlalchemy +import six -from apscheduler.jobstores.base import JobStore +from apscheduler.jobstores.base import BaseJobStore, JobLookupError, ConflictingIdError +from apscheduler.util import maybe_ref, utc_to_tzaware, tzaware_to_utc from apscheduler.job import Job try: - from sqlalchemy import * + from sqlalchemy import (create_engine, Table, PickleType, Column, MetaData, Unicode, DateTime, LargeBinary, select, + __version__ as sqlalchemy_version) + from sqlalchemy.exc import IntegrityError except ImportError: # pragma: nocover raise ImportError('SQLAlchemyJobStore requires SQLAlchemy installed') logger = logging.getLogger(__name__) -class SQLAlchemyJobStore(JobStore): - def __init__(self, url=None, engine=None, tablename='apscheduler_jobs', metadata=None, +class SQLAlchemyJobStore(BaseJobStore): + def __init__(self, scheduler, url=None, engine=None, tablename='apscheduler_jobs', metadata=None, pickle_protocol=pickle.HIGHEST_PROTOCOL): - self.jobs = [] + self.scheduler = scheduler self.pickle_protocol = pickle_protocol + metadata = maybe_ref(metadata) or MetaData() if engine: - self.engine = engine + self.engine = maybe_ref(engine) elif url: - self.engine = create_engine(url) + self.engine = create_engine(url, echo=True) else: raise ValueError('Need either "engine" or "url" defined') - if sqlalchemy.__version__ < '0.7': + if sqlalchemy_version < '0.7': # pragma: nocover pickle_coltype = PickleType(pickle_protocol, mutable=False) else: pickle_coltype = PickleType(pickle_protocol) self.jobs_t = Table( - tablename, metadata or MetaData(), - Column('id', Integer, Sequence(tablename + '_id_seq', optional=True), primary_key=True), - Column('trigger', pickle_coltype, nullable=False), - Column('func_ref', String(1024), nullable=False), - Column('args', pickle_coltype, nullable=False), - Column('kwargs', pickle_coltype, nullable=False), - Column('name', Unicode(1024)), - Column('misfire_grace_time', Integer, nullable=False), - Column('coalesce', Boolean, nullable=False), - Column('max_runs', Integer), - Column('max_instances', Integer), - Column('next_run_time', DateTime, nullable=False), - Column('runs', BigInteger)) + tablename, metadata, + Column('id', Unicode(1024), primary_key=True), + Column('next_run_time', DateTime, index=True), + Column('job_data', pickle_coltype, nullable=False) + ) self.jobs_t.create(self.engine, True) + def lookup_job(self, id): + selectable = select([self.jobs_t]).where(self.jobs_t.c.id == id) + row = self.engine.execute(selectable).fetchone() + if row is None: + raise JobLookupError(id) + return self._reconstitute_job(row) + + def get_pending_jobs(self, now): + selectable = select([self.jobs_t]).where(self.jobs_t.c.next_run_time <= now).\ + order_by(self.jobs_t.c.next_run_time) + jobs = self._get_jobs(selectable) + + selectable = select([self.jobs_t.c.next_run_time]).where(self.jobs_t.c.next_run_time > now).\ + order_by(self.jobs_t.c.next_run_time).limit(1) + next_run_time = self.engine.execute(selectable).scalar() + + return jobs, utc_to_tzaware(next_run_time, self.scheduler.timezone) + + def get_all_jobs(self): + selectable = select([self.jobs_t]).order_by(self.jobs_t.c.next_run_time) + return self._get_jobs(selectable) + def add_job(self, job): job_dict = job.__getstate__() - result = self.engine.execute(self.jobs_t.insert().values(**job_dict)) - job.id = result.inserted_primary_key[0] - self.jobs.append(job) + row_dict = { + 'id': job_dict.pop('id'), + 'next_run_time': tzaware_to_utc(job_dict.pop('next_run_time')), + 'job_data': job_dict + } + + insert = self.jobs_t.insert().values(**row_dict) + try: + self.engine.execute(insert) + except IntegrityError: + raise ConflictingIdError(job.id) + + def update_job(self, id, changes): + selectable = select([self.jobs_t]).where(self.jobs_t.c.id == id) + row = self.engine.execute(selectable).fetchone() + if row is None: + raise JobLookupError(id) + job_dict = dict(row.items())['job_data'] + + row_changes = {} + if 'id' in changes: + row_changes['id'] = changes.pop('id') + if 'next_run_time' in changes: + row_changes['next_run_time'] = tzaware_to_utc(changes['next_run_time']) + if changes: + job_dict.update(changes) + row_changes['job_data'] = job_dict + + update = self.jobs_t.update().where(self.jobs_t.c.id == id).values(**row_changes) + try: + self.engine.execute(update) + except IntegrityError: + raise ConflictingIdError(row_changes['id']) + + def remove_job(self, id): + delete = self.jobs_t.delete().where(self.jobs_t.c.id == id) + result = self.engine.execute(delete) + if result.rowcount == 0: + raise JobLookupError(id) - def remove_job(self, job): - delete = self.jobs_t.delete().where(self.jobs_t.c.id == job.id) + def remove_all_jobs(self): + delete = self.jobs_t.delete() self.engine.execute(delete) - self.jobs.remove(job) - def load_jobs(self): + def close(self): + self.engine.dispose() + + def _reconstitute_job(self, row): + job = Job.__new__(Job) + job_dict = dict(row.items()) + job_dict.update(job_dict.pop('job_data')) + job_dict['next_run_time'] = utc_to_tzaware(job_dict['next_run_time'], self.scheduler.timezone) + job.__setstate__(job_dict) + return job + + def _get_jobs(self, selectable): jobs = [] - for row in self.engine.execute(select([self.jobs_t])): + for row in self.engine.execute(selectable): try: - job = Job.__new__(Job) - job_dict = dict(row.items()) - job.__setstate__(job_dict) + job = self._reconstitute_job(row) jobs.append(job) except Exception: - job_name = job_dict.get('name', '(unknown)') - logger.exception('Unable to restore job "%s"', job_name) - self.jobs = jobs - - def update_job(self, job): - job_dict = job.__getstate__() - update = self.jobs_t.update().where(self.jobs_t.c.id == job.id).\ - values(next_run_time=job_dict['next_run_time'], runs=job_dict['runs']) - self.engine.execute(update) - - def close(self): - self.engine.dispose() + logger.exception(six.u('Unable to restore job (id=%s)'), row.id) + return jobs def __repr__(self): return '<%s (url=%s)>' % (self.__class__.__name__, self.engine.url) |