summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Grönholm <alex.gronholm@nextday.fi>2019-03-18 10:59:01 +0200
committerAlex Grönholm <alex.gronholm@nextday.fi>2019-03-18 11:10:31 +0200
commit8b951a658de3710f6fe3f4b1257ecad9e5f3f637 (patch)
tree657e119c19227fcb36711a0b83d2379b41b7ff7a
parent3d8cd86f7898ea14acec5c889372220d170314d1 (diff)
downloadapscheduler-8b951a658de3710f6fe3f4b1257ecad9e5f3f637.tar.gz
Adapted RethinkDBJobStore to rethinkdb library v2.4+
-rw-r--r--apscheduler/jobstores/rethinkdb.py38
-rw-r--r--docs/versionhistory.rst1
-rw-r--r--setup.py2
-rw-r--r--tests/test_jobstores.py2
4 files changed, 23 insertions, 20 deletions
diff --git a/apscheduler/jobstores/rethinkdb.py b/apscheduler/jobstores/rethinkdb.py
index 5d15664..d8a78cd 100644
--- a/apscheduler/jobstores/rethinkdb.py
+++ b/apscheduler/jobstores/rethinkdb.py
@@ -10,7 +10,7 @@ except ImportError: # pragma: nocover
import pickle
try:
- import rethinkdb as r
+ from rethinkdb import RethinkDB
except ImportError: # pragma: nocover
raise ImportError('RethinkDBJobStore requires rethinkdb installed')
@@ -40,10 +40,12 @@ class RethinkDBJobStore(BaseJobStore):
raise ValueError('The "table" parameter must not be empty')
self.database = database
- self.table = table
+ self.table_name = table
+ self.table = None
self.client = client
self.pickle_protocol = pickle_protocol
self.connect_args = connect_args
+ self.r = RethinkDB()
self.conn = None
def start(self, scheduler, alias):
@@ -52,31 +54,31 @@ class RethinkDBJobStore(BaseJobStore):
if self.client:
self.conn = maybe_ref(self.client)
else:
- self.conn = r.connect(db=self.database, **self.connect_args)
+ self.conn = self.r.connect(db=self.database, **self.connect_args)
- if self.database not in r.db_list().run(self.conn):
- r.db_create(self.database).run(self.conn)
+ if self.database not in self.r.db_list().run(self.conn):
+ self.r.db_create(self.database).run(self.conn)
- if self.table not in r.table_list().run(self.conn):
- r.table_create(self.table).run(self.conn)
+ if self.table_name not in self.r.table_list().run(self.conn):
+ self.r.table_create(self.table_name).run(self.conn)
- if 'next_run_time' not in r.table(self.table).index_list().run(self.conn):
- r.table(self.table).index_create('next_run_time').run(self.conn)
+ if 'next_run_time' not in self.r.table(self.table_name).index_list().run(self.conn):
+ self.r.table(self.table_name).index_create('next_run_time').run(self.conn)
- self.table = r.db(self.database).table(self.table)
+ self.table = self.r.db(self.database).table(self.table_name)
def lookup_job(self, job_id):
results = list(self.table.get_all(job_id).pluck('job_state').run(self.conn))
return self._reconstitute_job(results[0]['job_state']) if results else None
def get_due_jobs(self, now):
- return self._get_jobs(r.row['next_run_time'] <= datetime_to_utc_timestamp(now))
+ return self._get_jobs(self.r.row['next_run_time'] <= datetime_to_utc_timestamp(now))
def get_next_run_time(self):
results = list(
self.table
- .filter(r.row['next_run_time'] != None) # noqa
- .order_by(r.asc('next_run_time'))
+ .filter(self.r.row['next_run_time'] != None) # noqa
+ .order_by(self.r.asc('next_run_time'))
.map(lambda x: x['next_run_time'])
.limit(1)
.run(self.conn)
@@ -92,7 +94,7 @@ class RethinkDBJobStore(BaseJobStore):
job_dict = {
'id': job.id,
'next_run_time': datetime_to_utc_timestamp(job.next_run_time),
- 'job_state': r.binary(pickle.dumps(job.__getstate__(), self.pickle_protocol))
+ 'job_state': self.r.binary(pickle.dumps(job.__getstate__(), self.pickle_protocol))
}
results = self.table.insert(job_dict).run(self.conn)
if results['errors'] > 0:
@@ -101,7 +103,7 @@ class RethinkDBJobStore(BaseJobStore):
def update_job(self, job):
changes = {
'next_run_time': datetime_to_utc_timestamp(job.next_run_time),
- 'job_state': r.binary(pickle.dumps(job.__getstate__(), self.pickle_protocol))
+ 'job_state': self.r.binary(pickle.dumps(job.__getstate__(), self.pickle_protocol))
}
results = self.table.get_all(job.id).update(changes).run(self.conn)
skipped = False in map(lambda x: results[x] == 0, results.keys())
@@ -130,8 +132,8 @@ class RethinkDBJobStore(BaseJobStore):
def _get_jobs(self, predicate=None):
jobs = []
failed_job_ids = []
- query = (self.table.filter(r.row['next_run_time'] != None).filter(predicate) if # noqa
- predicate else self.table)
+ query = (self.table.filter(self.r.row['next_run_time'] != None).filter(predicate) # noqa
+ if predicate else self.table)
query = query.order_by('next_run_time', 'id').pluck('id', 'job_state')
for document in query.run(self.conn):
@@ -143,7 +145,7 @@ class RethinkDBJobStore(BaseJobStore):
# Remove all the jobs we failed to restore
if failed_job_ids:
- r.expr(failed_job_ids).for_each(
+ self.r.expr(failed_job_ids).for_each(
lambda job_id: self.table.get_all(job_id).delete()).run(self.conn)
return jobs
diff --git a/docs/versionhistory.rst b/docs/versionhistory.rst
index b9ffff1..01991e4 100644
--- a/docs/versionhistory.rst
+++ b/docs/versionhistory.rst
@@ -8,6 +8,7 @@ UNRELEASED
----------
* Adapted ``RedisJobStore`` to v3.0 of the ``redis`` library
+* Adapted ``RethinkDBJobStore`` to v2.4 of the ``rethink`` library
* Fixed ``DeprecationWarnings`` about ``collections.abc`` on Python 3.7 (PR by Roman Levin)
3.5.3
diff --git a/setup.py b/setup.py
index 92b8da7..6988f74 100644
--- a/setup.py
+++ b/setup.py
@@ -49,7 +49,7 @@ setup(
'gevent': ['gevent'],
'mongodb': ['pymongo >= 2.8'],
'redis': ['redis >= 3.0'],
- 'rethinkdb': ['rethinkdb'],
+ 'rethinkdb': ['rethinkdb >= 2.4.0'],
'sqlalchemy': ['sqlalchemy >= 0.8'],
'tornado': ['tornado >= 4.3'],
'twisted': ['twisted'],
diff --git a/tests/test_jobstores.py b/tests/test_jobstores.py
index 2138a6b..de9bceb 100644
--- a/tests/test_jobstores.py
+++ b/tests/test_jobstores.py
@@ -49,7 +49,7 @@ def rethinkdbjobstore():
store = rethinkdb.RethinkDBJobStore(database='apscheduler_unittest')
store.start(None, 'rethinkdb')
yield store
- rethinkdb.r.db_drop('apscheduler_unittest').run(store.conn)
+ store.r.db_drop('apscheduler_unittest').run(store.conn)
store.shutdown()