diff options
author | Jose Ignacio Villar <jvillar@gmail.com> | 2016-07-03 20:21:55 +0200 |
---|---|---|
committer | Alex Grönholm <alex.gronholm@nextday.fi> | 2016-07-03 21:21:55 +0300 |
commit | 9eb5dc50c4778ee6771c72a53dd70be3c0040fcf (patch) | |
tree | 8958dad581da6f41e1086db8686f6ef246045668 | |
parent | 93c4393244ba6989c48f8b5f42b2571ec719cef2 (diff) | |
download | apscheduler-9eb5dc50c4778ee6771c72a53dd70be3c0040fcf.tar.gz |
Zookeeper jobstore support (#144)
-rw-r--r-- | .travis.yml | 2 | ||||
-rw-r--r-- | README.rst | 1 | ||||
-rw-r--r-- | apscheduler/jobstores/zookeeper.py | 176 | ||||
-rw-r--r-- | docs/contributing.rst | 1 | ||||
-rw-r--r-- | examples/jobstores/zookeeper.py | 33 | ||||
-rw-r--r-- | setup.py | 3 | ||||
-rw-r--r-- | tests/test_jobstores.py | 62 | ||||
-rw-r--r-- | tox.ini | 5 |
8 files changed, 274 insertions, 9 deletions
diff --git a/.travis.yml b/.travis.yml index 67c639f..4451f96 100644 --- a/.travis.yml +++ b/.travis.yml @@ -16,8 +16,10 @@ python: before_install: - docker pull mongo:latest - docker pull rethinkdb:latest + - docker pull jplock/zookeeper - docker run -d -p 127.0.0.1:27017:27017 mongo - docker run -d -p 127.0.0.1:28015:28015 rethinkdb + - docker run -d -p 127.0.0.1:2181:2181 -p 127.0.0.1:2888:2888 -p 127.0.0.1:3888:3888 jplock/zookeeper install: pip install tox-travis coveralls @@ -29,6 +29,7 @@ Supported backends for storing jobs include: * `SQLAlchemy <http://www.sqlalchemy.org/>`_ (any RDBMS supported by SQLAlchemy works) * `MongoDB <http://www.mongodb.org/>`_ * `Redis <http://redis.io/>`_ +* `Zookeeper <https://zookeeper.apache.org/>`_ APScheduler also integrates with several common Python frameworks, like: diff --git a/apscheduler/jobstores/zookeeper.py b/apscheduler/jobstores/zookeeper.py new file mode 100644 index 0000000..f800b37 --- /dev/null +++ b/apscheduler/jobstores/zookeeper.py @@ -0,0 +1,176 @@ +from __future__ import absolute_import +import os +from datetime import datetime +from pytz import utc +from kazoo.exceptions import NoNodeError, NodeExistsError +from apscheduler.jobstores.base import BaseJobStore, JobLookupError, ConflictingIdError +from apscheduler.util import maybe_ref, datetime_to_utc_timestamp, utc_timestamp_to_datetime +from apscheduler.job import Job + +try: + import cPickle as pickle +except ImportError: # pragma: nocover + import pickle + +try: + from kazoo.client import KazooClient +except ImportError: # pragma: nocover + raise ImportError('ZookeeperJobStore requires Kazoo installed') + + +class ZookeeperJobStore(BaseJobStore): + """ + Stores jobs in a Zookeeper tree. Any leftover keyword arguments are directly passed to + kazoo's `KazooClient + <http://kazoo.readthedocs.io/en/latest/api/client.html>`_. + + Plugin alias: ``zookeeper`` + + :param str path: path to store jobs in + :param client: a :class:`~kazoo.client.KazooClient` instance to use instead of + providing connection arguments + :param int pickle_protocol: pickle protocol level to use (for serialization), defaults to the + highest available + """ + + def __init__(self, path='/apscheduler', client=None, close_connection_on_exit=False, + pickle_protocol=pickle.HIGHEST_PROTOCOL, **connect_args): + super(ZookeeperJobStore, self).__init__() + self.pickle_protocol = pickle_protocol + self.close_connection_on_exit = close_connection_on_exit + + if not path: + raise ValueError('The "path" parameter must not be empty') + + self.path = path + + if client: + self.client = maybe_ref(client) + else: + self.client = KazooClient(**connect_args) + self._ensured_path = False + + def _ensure_paths(self): + if not self._ensured_path: + self.client.ensure_path(self.path) + self._ensured_path = True + + def start(self, scheduler, alias): + super(ZookeeperJobStore, self).start(scheduler, alias) + if not self.client.connected: + self.client.start() + + def lookup_job(self, job_id): + self._ensure_paths() + node_path = os.path.join(self.path, job_id) + try: + content, _ = self.client.get(node_path) + doc = pickle.loads(content) + job = self._reconstitute_job(doc['job_state']) + return job + except: + return None + + def get_due_jobs(self, now): + timestamp = datetime_to_utc_timestamp(now) + jobs = [job_def['job'] for job_def in self._get_jobs() + if job_def['next_run_time'] is not None and job_def['next_run_time'] <= timestamp] + return jobs + + def get_next_run_time(self): + next_runs = [job_def['next_run_time'] for job_def in self._get_jobs() + if job_def['next_run_time'] is not None] + return utc_timestamp_to_datetime(min(next_runs)) if len(next_runs) > 0 else None + + def get_all_jobs(self): + jobs = [job_def['job'] for job_def in self._get_jobs()] + self._fix_paused_jobs_sorting(jobs) + return jobs + + def add_job(self, job): + self._ensure_paths() + node_path = os.path.join(self.path, str(job.id)) + value = { + 'next_run_time': datetime_to_utc_timestamp(job.next_run_time), + 'job_state': job.__getstate__() + } + data = pickle.dumps(value, self.pickle_protocol) + try: + self.client.create(node_path, value=data) + except NodeExistsError: + raise ConflictingIdError(job.id) + + def update_job(self, job): + self._ensure_paths() + node_path = os.path.join(self.path, str(job.id)) + changes = { + 'next_run_time': datetime_to_utc_timestamp(job.next_run_time), + 'job_state': job.__getstate__() + } + data = pickle.dumps(changes, self.pickle_protocol) + try: + self.client.set(node_path, value=data) + except NoNodeError: + raise JobLookupError(job.id) + + def remove_job(self, job_id): + self._ensure_paths() + node_path = os.path.join(self.path, str(job_id)) + try: + self.client.delete(node_path) + except NoNodeError: + raise JobLookupError(job_id) + + def remove_all_jobs(self): + try: + self.client.delete(self.path, recursive=True) + except NoNodeError: + pass + self._ensured_path = False + + def shutdown(self): + if self.close_connection_on_exit: + self.client.stop() + self.client.close() + + def _reconstitute_job(self, job_state): + job_state = job_state + job = Job.__new__(Job) + job.__setstate__(job_state) + job._scheduler = self._scheduler + job._jobstore_alias = self._alias + return job + + def _get_jobs(self): + self._ensure_paths() + jobs = [] + failed_job_ids = [] + all_ids = self.client.get_children(self.path) + for node_name in all_ids: + try: + node_path = os.path.join(self.path, node_name) + content, _ = self.client.get(node_path) + doc = pickle.loads(content) + job_def = { + 'job_id': node_name, + 'next_run_time': doc['next_run_time'] if doc['next_run_time'] else None, + 'job_state': doc['job_state'], + 'job': self._reconstitute_job(doc['job_state']), + 'creation_time': _.ctime + } + jobs.append(job_def) + except: + self._logger.exception('Unable to restore job "%s" -- removing it' % node_name) + failed_job_ids.append(node_name) + + # Remove all the jobs we failed to restore + if failed_job_ids: + for failed_id in failed_job_ids: + self.remove_job(failed_id) + paused_sort_key = datetime(9999, 12, 31, tzinfo=utc) + return sorted(jobs, key=lambda job_def: (job_def['job'].next_run_time or paused_sort_key, + job_def['creation_time'])) + + def __repr__(self): + self._logger.exception('<%s (client=%s)>' % (self.__class__.__name__, self.client)) + return '<%s (client=%s)>' % (self.__class__.__name__, self.client) diff --git a/docs/contributing.rst b/docs/contributing.rst index 41cae5f..7b63c2c 100644 --- a/docs/contributing.rst +++ b/docs/contributing.rst @@ -30,6 +30,7 @@ To fully run the test suite, you will need at least: * A MongoDB server * A Redis server + * A Zookeeper server For other dependencies, it's best to look in tox.ini and install what is appropriate for the Python version you're using. diff --git a/examples/jobstores/zookeeper.py b/examples/jobstores/zookeeper.py new file mode 100644 index 0000000..12b3e42 --- /dev/null +++ b/examples/jobstores/zookeeper.py @@ -0,0 +1,33 @@ +""" +This example demonstrates the use of the Zookeeper job store. +On each run, it adds a new alarm that fires after ten seconds. +You can exit the program, restart it and observe that any previous alarms that have not fired yet +are still active. Running the example with the --clear switch will remove any existing alarms. +""" + +from datetime import datetime, timedelta +import sys +import os + +from apscheduler.schedulers.blocking import BlockingScheduler + + +def alarm(time): + print('Alarm! This alarm was scheduled at %s.' % time) + + +if __name__ == '__main__': + scheduler = BlockingScheduler() + scheduler.add_jobstore('zookeeper', path='/example_jobs') + if len(sys.argv) > 1 and sys.argv[1] == '--clear': + scheduler.remove_all_jobs() + + alarm_time = datetime.now() + timedelta(seconds=10) + scheduler.add_job(alarm, 'date', run_date=alarm_time, args=[datetime.now()]) + print('To clear the alarms, run this example with the --clear argument.') + print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C')) + + try: + scheduler.start() + except (KeyboardInterrupt, SystemExit): + pass @@ -65,7 +65,8 @@ setup( 'sqlalchemy = apscheduler.jobstores.sqlalchemy:SQLAlchemyJobStore', 'mongodb = apscheduler.jobstores.mongodb:MongoDBJobStore', 'rethinkdb = apscheduler.jobstores.rethinkdb:RethinkDBJobStore', - 'redis = apscheduler.jobstores.redis:RedisJobStore' + 'redis = apscheduler.jobstores.redis:RedisJobStore', + 'zookeeper = apscheduler.jobstores.zookeeper:ZookeeperJobStore' ] } ) diff --git a/tests/test_jobstores.py b/tests/test_jobstores.py index 0a56d57..3949763 100644 --- a/tests/test_jobstores.py +++ b/tests/test_jobstores.py @@ -64,16 +64,26 @@ def redisjobstore(): store.shutdown() -@pytest.fixture(params=[ - 'memjobstore', 'sqlalchemyjobstore', 'mongodbjobstore', 'redisjobstore', 'rethinkdbjobstore' -], ids=['memory', 'sqlalchemy', 'mongodb', 'redis', 'rethinkdb']) +@pytest.yield_fixture +def zookeeperjobstore(): + zookeeper = pytest.importorskip('apscheduler.jobstores.zookeeper') + store = zookeeper.ZookeeperJobStore(path='/apscheduler_unittest') + store.start(None, 'zookeeper') + yield store + store.remove_all_jobs() + store.shutdown() + + +@pytest.fixture(params=['memjobstore', 'sqlalchemyjobstore', 'mongodbjobstore', 'redisjobstore', + 'rethinkdbjobstore', 'zookeeperjobstore'], + ids=['memory', 'sqlalchemy', 'mongodb', 'redis', 'rethinkdb', 'zookeeper']) def jobstore(request): return request.getfuncargvalue(request.param) -@pytest.fixture(params=[ - 'sqlalchemyjobstore', 'mongodbjobstore', 'redisjobstore', 'rethinkdbjobstore' -], ids=['sqlalchemy', 'mongodb', 'redis', 'rethinkdb']) +@pytest.fixture(params=['sqlalchemyjobstore', 'mongodbjobstore', 'redisjobstore', + 'rethinkdbjobstore', 'zookeeperjobstore'], + ids=['sqlalchemy', 'mongodb', 'redis', 'rethinkdb', 'zookeeper']) def persistent_jobstore(request): return request.getfuncargvalue(request.param) @@ -269,6 +279,11 @@ def test_repr_redisjobstore(redisjobstore): assert repr(redisjobstore) == '<RedisJobStore>' +def test_repr_zookeeperjobstore(zookeeperjobstore): + class_sig = "<ZookeeperJobStore (client=<kazoo.client.KazooClient" + assert repr(zookeeperjobstore).startswith(class_sig) + + def test_memstore_close(memjobstore, create_add_job): create_add_job(memjobstore, dummy_job, datetime(2016, 5, 3)) memjobstore.shutdown() @@ -303,6 +318,35 @@ def test_mongodb_client_ref(): del mongodb_client +def test_zookeeper_client_ref(): + global zookeeper_client + zookeeper = pytest.importorskip('apscheduler.jobstores.zookeeper') + zookeeper_client = zookeeper.KazooClient() + try: + zookeeperjobstore = zookeeper.ZookeeperJobStore(client='%s:zookeeper_client' % __name__) + zookeeperjobstore.start(None, 'zookeeper') + zookeeperjobstore.shutdown() + assert zookeeper_client.connected is True + finally: + zookeeper_client.stop() + zookeeper_client.close() + del zookeeper_client + + +def test_zookeeper_client_keep_open(): + global zookeeper_client + zookeeper = pytest.importorskip('apscheduler.jobstores.zookeeper') + zookeeper_client = zookeeper.KazooClient() + try: + zookeeperjobstore = zookeeper.ZookeeperJobStore(client='%s:zookeeper_client' % __name__, + close_connection_on_exit=True) + zookeeperjobstore.start(None, 'zookeeper') + zookeeperjobstore.shutdown() + assert zookeeper_client.connected is False + finally: + del zookeeper_client + + def test_mongodb_null_database(): mongodb = pytest.importorskip('apscheduler.jobstores.mongodb') exc = pytest.raises(ValueError, mongodb.MongoDBJobStore, database='') @@ -313,3 +357,9 @@ def test_mongodb_null_collection(): mongodb = pytest.importorskip('apscheduler.jobstores.mongodb') exc = pytest.raises(ValueError, mongodb.MongoDBJobStore, collection='') assert '"collection"' in str(exc.value) + + +def test_zookeeper_null_path(): + zookeeper = pytest.importorskip('apscheduler.jobstores.zookeeper') + exc = pytest.raises(ValueError, zookeeper.ZookeeperJobStore, path='') + assert '"path"' in str(exc.value) @@ -14,8 +14,9 @@ commands = py.test {posargs} deps = pytest pytest-cov pytest-catchlog - sqlalchemy - pymongo + sqlalchemy + pymongo + kazoo redis rethinkdb tornado |