summaryrefslogtreecommitdiff
path: root/apscheduler/jobstores/sqlalchemy.py
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2014-02-01 17:52:47 +0200
committerAlex Grönholm <alex.gronholm@nextday.fi>2014-03-09 10:13:55 +0200
commitedbc940a9d9206b0a81d4067484e8a8cadaa035b (patch)
tree50dcfca1d6b5157cf39cff8b9230f1b93385cd2e /apscheduler/jobstores/sqlalchemy.py
parent17aae6293ce5f090477d48e15d25cf5b34a906ad (diff)
downloadapscheduler-edbc940a9d9206b0a81d4067484e8a8cadaa035b.tar.gz
Overhauled the job store system and ditched ShelveJobStore and RedisJobStore in the process
Diffstat (limited to 'apscheduler/jobstores/sqlalchemy.py')
-rw-r--r--apscheduler/jobstores/sqlalchemy.py142
1 files changed, 97 insertions, 45 deletions
diff --git a/apscheduler/jobstores/sqlalchemy.py b/apscheduler/jobstores/sqlalchemy.py
index 5ce368c..a4934bc 100644
--- a/apscheduler/jobstores/sqlalchemy.py
+++ b/apscheduler/jobstores/sqlalchemy.py
@@ -5,85 +5,137 @@ from __future__ import absolute_import
import pickle
import logging
-import sqlalchemy
+import six
-from apscheduler.jobstores.base import JobStore
+from apscheduler.jobstores.base import BaseJobStore, JobLookupError, ConflictingIdError
+from apscheduler.util import maybe_ref, utc_to_tzaware, tzaware_to_utc
from apscheduler.job import Job
try:
- from sqlalchemy import *
+ from sqlalchemy import (create_engine, Table, PickleType, Column, MetaData, Unicode, DateTime, LargeBinary, select,
+ __version__ as sqlalchemy_version)
+ from sqlalchemy.exc import IntegrityError
except ImportError: # pragma: nocover
raise ImportError('SQLAlchemyJobStore requires SQLAlchemy installed')
logger = logging.getLogger(__name__)
-class SQLAlchemyJobStore(JobStore):
- def __init__(self, url=None, engine=None, tablename='apscheduler_jobs', metadata=None,
+class SQLAlchemyJobStore(BaseJobStore):
+ def __init__(self, scheduler, url=None, engine=None, tablename='apscheduler_jobs', metadata=None,
pickle_protocol=pickle.HIGHEST_PROTOCOL):
- self.jobs = []
+ self.scheduler = scheduler
self.pickle_protocol = pickle_protocol
+ metadata = maybe_ref(metadata) or MetaData()
if engine:
- self.engine = engine
+ self.engine = maybe_ref(engine)
elif url:
- self.engine = create_engine(url)
+ self.engine = create_engine(url, echo=True)
else:
raise ValueError('Need either "engine" or "url" defined')
- if sqlalchemy.__version__ < '0.7':
+ if sqlalchemy_version < '0.7': # pragma: nocover
pickle_coltype = PickleType(pickle_protocol, mutable=False)
else:
pickle_coltype = PickleType(pickle_protocol)
self.jobs_t = Table(
- tablename, metadata or MetaData(),
- Column('id', Integer, Sequence(tablename + '_id_seq', optional=True), primary_key=True),
- Column('trigger', pickle_coltype, nullable=False),
- Column('func_ref', String(1024), nullable=False),
- Column('args', pickle_coltype, nullable=False),
- Column('kwargs', pickle_coltype, nullable=False),
- Column('name', Unicode(1024)),
- Column('misfire_grace_time', Integer, nullable=False),
- Column('coalesce', Boolean, nullable=False),
- Column('max_runs', Integer),
- Column('max_instances', Integer),
- Column('next_run_time', DateTime, nullable=False),
- Column('runs', BigInteger))
+ tablename, metadata,
+ Column('id', Unicode(1024), primary_key=True),
+ Column('next_run_time', DateTime, index=True),
+ Column('job_data', pickle_coltype, nullable=False)
+ )
self.jobs_t.create(self.engine, True)
+ def lookup_job(self, id):
+ selectable = select([self.jobs_t]).where(self.jobs_t.c.id == id)
+ row = self.engine.execute(selectable).fetchone()
+ if row is None:
+ raise JobLookupError(id)
+ return self._reconstitute_job(row)
+
+ def get_pending_jobs(self, now):
+ selectable = select([self.jobs_t]).where(self.jobs_t.c.next_run_time <= now).\
+ order_by(self.jobs_t.c.next_run_time)
+ jobs = self._get_jobs(selectable)
+
+ selectable = select([self.jobs_t.c.next_run_time]).where(self.jobs_t.c.next_run_time > now).\
+ order_by(self.jobs_t.c.next_run_time).limit(1)
+ next_run_time = self.engine.execute(selectable).scalar()
+
+ return jobs, utc_to_tzaware(next_run_time, self.scheduler.timezone)
+
+ def get_all_jobs(self):
+ selectable = select([self.jobs_t]).order_by(self.jobs_t.c.next_run_time)
+ return self._get_jobs(selectable)
+
def add_job(self, job):
job_dict = job.__getstate__()
- result = self.engine.execute(self.jobs_t.insert().values(**job_dict))
- job.id = result.inserted_primary_key[0]
- self.jobs.append(job)
+ row_dict = {
+ 'id': job_dict.pop('id'),
+ 'next_run_time': tzaware_to_utc(job_dict.pop('next_run_time')),
+ 'job_data': job_dict
+ }
+
+ insert = self.jobs_t.insert().values(**row_dict)
+ try:
+ self.engine.execute(insert)
+ except IntegrityError:
+ raise ConflictingIdError(job.id)
+
+ def update_job(self, id, changes):
+ selectable = select([self.jobs_t]).where(self.jobs_t.c.id == id)
+ row = self.engine.execute(selectable).fetchone()
+ if row is None:
+ raise JobLookupError(id)
+ job_dict = dict(row.items())['job_data']
+
+ row_changes = {}
+ if 'id' in changes:
+ row_changes['id'] = changes.pop('id')
+ if 'next_run_time' in changes:
+ row_changes['next_run_time'] = tzaware_to_utc(changes['next_run_time'])
+ if changes:
+ job_dict.update(changes)
+ row_changes['job_data'] = job_dict
+
+ update = self.jobs_t.update().where(self.jobs_t.c.id == id).values(**row_changes)
+ try:
+ self.engine.execute(update)
+ except IntegrityError:
+ raise ConflictingIdError(row_changes['id'])
+
+ def remove_job(self, id):
+ delete = self.jobs_t.delete().where(self.jobs_t.c.id == id)
+ result = self.engine.execute(delete)
+ if result.rowcount == 0:
+ raise JobLookupError(id)
- def remove_job(self, job):
- delete = self.jobs_t.delete().where(self.jobs_t.c.id == job.id)
+ def remove_all_jobs(self):
+ delete = self.jobs_t.delete()
self.engine.execute(delete)
- self.jobs.remove(job)
- def load_jobs(self):
+ def close(self):
+ self.engine.dispose()
+
+ def _reconstitute_job(self, row):
+ job = Job.__new__(Job)
+ job_dict = dict(row.items())
+ job_dict.update(job_dict.pop('job_data'))
+ job_dict['next_run_time'] = utc_to_tzaware(job_dict['next_run_time'], self.scheduler.timezone)
+ job.__setstate__(job_dict)
+ return job
+
+ def _get_jobs(self, selectable):
jobs = []
- for row in self.engine.execute(select([self.jobs_t])):
+ for row in self.engine.execute(selectable):
try:
- job = Job.__new__(Job)
- job_dict = dict(row.items())
- job.__setstate__(job_dict)
+ job = self._reconstitute_job(row)
jobs.append(job)
except Exception:
- job_name = job_dict.get('name', '(unknown)')
- logger.exception('Unable to restore job "%s"', job_name)
- self.jobs = jobs
-
- def update_job(self, job):
- job_dict = job.__getstate__()
- update = self.jobs_t.update().where(self.jobs_t.c.id == job.id).\
- values(next_run_time=job_dict['next_run_time'], runs=job_dict['runs'])
- self.engine.execute(update)
-
- def close(self):
- self.engine.dispose()
+ logger.exception(six.u('Unable to restore job (id=%s)'), row.id)
+ return jobs
def __repr__(self):
return '<%s (url=%s)>' % (self.__class__.__name__, self.engine.url)