summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.travis.yml2
-rw-r--r--README.rst1
-rw-r--r--apscheduler/jobstores/zookeeper.py176
-rw-r--r--docs/contributing.rst1
-rw-r--r--examples/jobstores/zookeeper.py33
-rw-r--r--setup.py3
-rw-r--r--tests/test_jobstores.py62
-rw-r--r--tox.ini5
8 files changed, 274 insertions, 9 deletions
diff --git a/.travis.yml b/.travis.yml
index 67c639f..4451f96 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -16,8 +16,10 @@ python:
before_install:
- docker pull mongo:latest
- docker pull rethinkdb:latest
+ - docker pull jplock/zookeeper
- docker run -d -p 127.0.0.1:27017:27017 mongo
- docker run -d -p 127.0.0.1:28015:28015 rethinkdb
+ - docker run -d -p 127.0.0.1:2181:2181 -p 127.0.0.1:2888:2888 -p 127.0.0.1:3888:3888 jplock/zookeeper
install: pip install tox-travis coveralls
diff --git a/README.rst b/README.rst
index efb4979..f45c697 100644
--- a/README.rst
+++ b/README.rst
@@ -29,6 +29,7 @@ Supported backends for storing jobs include:
* `SQLAlchemy <http://www.sqlalchemy.org/>`_ (any RDBMS supported by SQLAlchemy works)
* `MongoDB <http://www.mongodb.org/>`_
* `Redis <http://redis.io/>`_
+* `Zookeeper <https://zookeeper.apache.org/>`_
APScheduler also integrates with several common Python frameworks, like:
diff --git a/apscheduler/jobstores/zookeeper.py b/apscheduler/jobstores/zookeeper.py
new file mode 100644
index 0000000..f800b37
--- /dev/null
+++ b/apscheduler/jobstores/zookeeper.py
@@ -0,0 +1,176 @@
+from __future__ import absolute_import
+import os
+from datetime import datetime
+from pytz import utc
+from kazoo.exceptions import NoNodeError, NodeExistsError
+from apscheduler.jobstores.base import BaseJobStore, JobLookupError, ConflictingIdError
+from apscheduler.util import maybe_ref, datetime_to_utc_timestamp, utc_timestamp_to_datetime
+from apscheduler.job import Job
+
+try:
+ import cPickle as pickle
+except ImportError: # pragma: nocover
+ import pickle
+
+try:
+ from kazoo.client import KazooClient
+except ImportError: # pragma: nocover
+ raise ImportError('ZookeeperJobStore requires Kazoo installed')
+
+
+class ZookeeperJobStore(BaseJobStore):
+ """
+ Stores jobs in a Zookeeper tree. Any leftover keyword arguments are directly passed to
+ kazoo's `KazooClient
+ <http://kazoo.readthedocs.io/en/latest/api/client.html>`_.
+
+ Plugin alias: ``zookeeper``
+
+ :param str path: path to store jobs in
+ :param client: a :class:`~kazoo.client.KazooClient` instance to use instead of
+ providing connection arguments
+ :param int pickle_protocol: pickle protocol level to use (for serialization), defaults to the
+ highest available
+ """
+
+ def __init__(self, path='/apscheduler', client=None, close_connection_on_exit=False,
+ pickle_protocol=pickle.HIGHEST_PROTOCOL, **connect_args):
+ super(ZookeeperJobStore, self).__init__()
+ self.pickle_protocol = pickle_protocol
+ self.close_connection_on_exit = close_connection_on_exit
+
+ if not path:
+ raise ValueError('The "path" parameter must not be empty')
+
+ self.path = path
+
+ if client:
+ self.client = maybe_ref(client)
+ else:
+ self.client = KazooClient(**connect_args)
+ self._ensured_path = False
+
+ def _ensure_paths(self):
+ if not self._ensured_path:
+ self.client.ensure_path(self.path)
+ self._ensured_path = True
+
+ def start(self, scheduler, alias):
+ super(ZookeeperJobStore, self).start(scheduler, alias)
+ if not self.client.connected:
+ self.client.start()
+
+ def lookup_job(self, job_id):
+ self._ensure_paths()
+ node_path = os.path.join(self.path, job_id)
+ try:
+ content, _ = self.client.get(node_path)
+ doc = pickle.loads(content)
+ job = self._reconstitute_job(doc['job_state'])
+ return job
+ except:
+ return None
+
+ def get_due_jobs(self, now):
+ timestamp = datetime_to_utc_timestamp(now)
+ jobs = [job_def['job'] for job_def in self._get_jobs()
+ if job_def['next_run_time'] is not None and job_def['next_run_time'] <= timestamp]
+ return jobs
+
+ def get_next_run_time(self):
+ next_runs = [job_def['next_run_time'] for job_def in self._get_jobs()
+ if job_def['next_run_time'] is not None]
+ return utc_timestamp_to_datetime(min(next_runs)) if len(next_runs) > 0 else None
+
+ def get_all_jobs(self):
+ jobs = [job_def['job'] for job_def in self._get_jobs()]
+ self._fix_paused_jobs_sorting(jobs)
+ return jobs
+
+ def add_job(self, job):
+ self._ensure_paths()
+ node_path = os.path.join(self.path, str(job.id))
+ value = {
+ 'next_run_time': datetime_to_utc_timestamp(job.next_run_time),
+ 'job_state': job.__getstate__()
+ }
+ data = pickle.dumps(value, self.pickle_protocol)
+ try:
+ self.client.create(node_path, value=data)
+ except NodeExistsError:
+ raise ConflictingIdError(job.id)
+
+ def update_job(self, job):
+ self._ensure_paths()
+ node_path = os.path.join(self.path, str(job.id))
+ changes = {
+ 'next_run_time': datetime_to_utc_timestamp(job.next_run_time),
+ 'job_state': job.__getstate__()
+ }
+ data = pickle.dumps(changes, self.pickle_protocol)
+ try:
+ self.client.set(node_path, value=data)
+ except NoNodeError:
+ raise JobLookupError(job.id)
+
+ def remove_job(self, job_id):
+ self._ensure_paths()
+ node_path = os.path.join(self.path, str(job_id))
+ try:
+ self.client.delete(node_path)
+ except NoNodeError:
+ raise JobLookupError(job_id)
+
+ def remove_all_jobs(self):
+ try:
+ self.client.delete(self.path, recursive=True)
+ except NoNodeError:
+ pass
+ self._ensured_path = False
+
+ def shutdown(self):
+ if self.close_connection_on_exit:
+ self.client.stop()
+ self.client.close()
+
+ def _reconstitute_job(self, job_state):
+ job_state = job_state
+ job = Job.__new__(Job)
+ job.__setstate__(job_state)
+ job._scheduler = self._scheduler
+ job._jobstore_alias = self._alias
+ return job
+
+ def _get_jobs(self):
+ self._ensure_paths()
+ jobs = []
+ failed_job_ids = []
+ all_ids = self.client.get_children(self.path)
+ for node_name in all_ids:
+ try:
+ node_path = os.path.join(self.path, node_name)
+ content, _ = self.client.get(node_path)
+ doc = pickle.loads(content)
+ job_def = {
+ 'job_id': node_name,
+ 'next_run_time': doc['next_run_time'] if doc['next_run_time'] else None,
+ 'job_state': doc['job_state'],
+ 'job': self._reconstitute_job(doc['job_state']),
+ 'creation_time': _.ctime
+ }
+ jobs.append(job_def)
+ except:
+ self._logger.exception('Unable to restore job "%s" -- removing it' % node_name)
+ failed_job_ids.append(node_name)
+
+ # Remove all the jobs we failed to restore
+ if failed_job_ids:
+ for failed_id in failed_job_ids:
+ self.remove_job(failed_id)
+ paused_sort_key = datetime(9999, 12, 31, tzinfo=utc)
+ return sorted(jobs, key=lambda job_def: (job_def['job'].next_run_time or paused_sort_key,
+ job_def['creation_time']))
+
+ def __repr__(self):
+ self._logger.exception('<%s (client=%s)>' % (self.__class__.__name__, self.client))
+ return '<%s (client=%s)>' % (self.__class__.__name__, self.client)
diff --git a/docs/contributing.rst b/docs/contributing.rst
index 41cae5f..7b63c2c 100644
--- a/docs/contributing.rst
+++ b/docs/contributing.rst
@@ -30,6 +30,7 @@ To fully run the test suite, you will need at least:
* A MongoDB server
* A Redis server
+ * A Zookeeper server
For other dependencies, it's best to look in tox.ini and install what is appropriate for the Python version you're
using.
diff --git a/examples/jobstores/zookeeper.py b/examples/jobstores/zookeeper.py
new file mode 100644
index 0000000..12b3e42
--- /dev/null
+++ b/examples/jobstores/zookeeper.py
@@ -0,0 +1,33 @@
+"""
+This example demonstrates the use of the Zookeeper job store.
+On each run, it adds a new alarm that fires after ten seconds.
+You can exit the program, restart it and observe that any previous alarms that have not fired yet
+are still active. Running the example with the --clear switch will remove any existing alarms.
+"""
+
+from datetime import datetime, timedelta
+import sys
+import os
+
+from apscheduler.schedulers.blocking import BlockingScheduler
+
+
+def alarm(time):
+ print('Alarm! This alarm was scheduled at %s.' % time)
+
+
+if __name__ == '__main__':
+ scheduler = BlockingScheduler()
+ scheduler.add_jobstore('zookeeper', path='/example_jobs')
+ if len(sys.argv) > 1 and sys.argv[1] == '--clear':
+ scheduler.remove_all_jobs()
+
+ alarm_time = datetime.now() + timedelta(seconds=10)
+ scheduler.add_job(alarm, 'date', run_date=alarm_time, args=[datetime.now()])
+ print('To clear the alarms, run this example with the --clear argument.')
+ print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))
+
+ try:
+ scheduler.start()
+ except (KeyboardInterrupt, SystemExit):
+ pass
diff --git a/setup.py b/setup.py
index 36e36b5..26d4d8f 100644
--- a/setup.py
+++ b/setup.py
@@ -65,7 +65,8 @@ setup(
'sqlalchemy = apscheduler.jobstores.sqlalchemy:SQLAlchemyJobStore',
'mongodb = apscheduler.jobstores.mongodb:MongoDBJobStore',
'rethinkdb = apscheduler.jobstores.rethinkdb:RethinkDBJobStore',
- 'redis = apscheduler.jobstores.redis:RedisJobStore'
+ 'redis = apscheduler.jobstores.redis:RedisJobStore',
+ 'zookeeper = apscheduler.jobstores.zookeeper:ZookeeperJobStore'
]
}
)
diff --git a/tests/test_jobstores.py b/tests/test_jobstores.py
index 0a56d57..3949763 100644
--- a/tests/test_jobstores.py
+++ b/tests/test_jobstores.py
@@ -64,16 +64,26 @@ def redisjobstore():
store.shutdown()
-@pytest.fixture(params=[
- 'memjobstore', 'sqlalchemyjobstore', 'mongodbjobstore', 'redisjobstore', 'rethinkdbjobstore'
-], ids=['memory', 'sqlalchemy', 'mongodb', 'redis', 'rethinkdb'])
+@pytest.yield_fixture
+def zookeeperjobstore():
+ zookeeper = pytest.importorskip('apscheduler.jobstores.zookeeper')
+ store = zookeeper.ZookeeperJobStore(path='/apscheduler_unittest')
+ store.start(None, 'zookeeper')
+ yield store
+ store.remove_all_jobs()
+ store.shutdown()
+
+
+@pytest.fixture(params=['memjobstore', 'sqlalchemyjobstore', 'mongodbjobstore', 'redisjobstore',
+ 'rethinkdbjobstore', 'zookeeperjobstore'],
+ ids=['memory', 'sqlalchemy', 'mongodb', 'redis', 'rethinkdb', 'zookeeper'])
def jobstore(request):
return request.getfuncargvalue(request.param)
-@pytest.fixture(params=[
- 'sqlalchemyjobstore', 'mongodbjobstore', 'redisjobstore', 'rethinkdbjobstore'
-], ids=['sqlalchemy', 'mongodb', 'redis', 'rethinkdb'])
+@pytest.fixture(params=['sqlalchemyjobstore', 'mongodbjobstore', 'redisjobstore',
+ 'rethinkdbjobstore', 'zookeeperjobstore'],
+ ids=['sqlalchemy', 'mongodb', 'redis', 'rethinkdb', 'zookeeper'])
def persistent_jobstore(request):
return request.getfuncargvalue(request.param)
@@ -269,6 +279,11 @@ def test_repr_redisjobstore(redisjobstore):
assert repr(redisjobstore) == '<RedisJobStore>'
+def test_repr_zookeeperjobstore(zookeeperjobstore):
+ class_sig = "<ZookeeperJobStore (client=<kazoo.client.KazooClient"
+ assert repr(zookeeperjobstore).startswith(class_sig)
+
+
def test_memstore_close(memjobstore, create_add_job):
create_add_job(memjobstore, dummy_job, datetime(2016, 5, 3))
memjobstore.shutdown()
@@ -303,6 +318,35 @@ def test_mongodb_client_ref():
del mongodb_client
+def test_zookeeper_client_ref():
+ global zookeeper_client
+ zookeeper = pytest.importorskip('apscheduler.jobstores.zookeeper')
+ zookeeper_client = zookeeper.KazooClient()
+ try:
+ zookeeperjobstore = zookeeper.ZookeeperJobStore(client='%s:zookeeper_client' % __name__)
+ zookeeperjobstore.start(None, 'zookeeper')
+ zookeeperjobstore.shutdown()
+ assert zookeeper_client.connected is True
+ finally:
+ zookeeper_client.stop()
+ zookeeper_client.close()
+ del zookeeper_client
+
+
+def test_zookeeper_client_keep_open():
+ global zookeeper_client
+ zookeeper = pytest.importorskip('apscheduler.jobstores.zookeeper')
+ zookeeper_client = zookeeper.KazooClient()
+ try:
+ zookeeperjobstore = zookeeper.ZookeeperJobStore(client='%s:zookeeper_client' % __name__,
+ close_connection_on_exit=True)
+ zookeeperjobstore.start(None, 'zookeeper')
+ zookeeperjobstore.shutdown()
+ assert zookeeper_client.connected is False
+ finally:
+ del zookeeper_client
+
+
def test_mongodb_null_database():
mongodb = pytest.importorskip('apscheduler.jobstores.mongodb')
exc = pytest.raises(ValueError, mongodb.MongoDBJobStore, database='')
@@ -313,3 +357,9 @@ def test_mongodb_null_collection():
mongodb = pytest.importorskip('apscheduler.jobstores.mongodb')
exc = pytest.raises(ValueError, mongodb.MongoDBJobStore, collection='')
assert '"collection"' in str(exc.value)
+
+
+def test_zookeeper_null_path():
+ zookeeper = pytest.importorskip('apscheduler.jobstores.zookeeper')
+ exc = pytest.raises(ValueError, zookeeper.ZookeeperJobStore, path='')
+ assert '"path"' in str(exc.value)
diff --git a/tox.ini b/tox.ini
index b35b15a..7668afc 100644
--- a/tox.ini
+++ b/tox.ini
@@ -14,8 +14,9 @@ commands = py.test {posargs}
deps = pytest
pytest-cov
pytest-catchlog
- sqlalchemy
- pymongo
+ sqlalchemy
+ pymongo
+ kazoo
redis
rethinkdb
tornado