diff options
author | Alex Grönholm <alex.gronholm@nextday.fi> | 2019-03-18 10:59:01 +0200 |
---|---|---|
committer | Alex Grönholm <alex.gronholm@nextday.fi> | 2019-03-18 11:10:31 +0200 |
commit | 8b951a658de3710f6fe3f4b1257ecad9e5f3f637 (patch) | |
tree | 657e119c19227fcb36711a0b83d2379b41b7ff7a | |
parent | 3d8cd86f7898ea14acec5c889372220d170314d1 (diff) | |
download | apscheduler-8b951a658de3710f6fe3f4b1257ecad9e5f3f637.tar.gz |
Adapted RethinkDBJobStore to rethinkdb library v2.4+
-rw-r--r-- | apscheduler/jobstores/rethinkdb.py | 38 | ||||
-rw-r--r-- | docs/versionhistory.rst | 1 | ||||
-rw-r--r-- | setup.py | 2 | ||||
-rw-r--r-- | tests/test_jobstores.py | 2 |
4 files changed, 23 insertions, 20 deletions
diff --git a/apscheduler/jobstores/rethinkdb.py b/apscheduler/jobstores/rethinkdb.py index 5d15664..d8a78cd 100644 --- a/apscheduler/jobstores/rethinkdb.py +++ b/apscheduler/jobstores/rethinkdb.py @@ -10,7 +10,7 @@ except ImportError: # pragma: nocover import pickle try: - import rethinkdb as r + from rethinkdb import RethinkDB except ImportError: # pragma: nocover raise ImportError('RethinkDBJobStore requires rethinkdb installed') @@ -40,10 +40,12 @@ class RethinkDBJobStore(BaseJobStore): raise ValueError('The "table" parameter must not be empty') self.database = database - self.table = table + self.table_name = table + self.table = None self.client = client self.pickle_protocol = pickle_protocol self.connect_args = connect_args + self.r = RethinkDB() self.conn = None def start(self, scheduler, alias): @@ -52,31 +54,31 @@ class RethinkDBJobStore(BaseJobStore): if self.client: self.conn = maybe_ref(self.client) else: - self.conn = r.connect(db=self.database, **self.connect_args) + self.conn = self.r.connect(db=self.database, **self.connect_args) - if self.database not in r.db_list().run(self.conn): - r.db_create(self.database).run(self.conn) + if self.database not in self.r.db_list().run(self.conn): + self.r.db_create(self.database).run(self.conn) - if self.table not in r.table_list().run(self.conn): - r.table_create(self.table).run(self.conn) + if self.table_name not in self.r.table_list().run(self.conn): + self.r.table_create(self.table_name).run(self.conn) - if 'next_run_time' not in r.table(self.table).index_list().run(self.conn): - r.table(self.table).index_create('next_run_time').run(self.conn) + if 'next_run_time' not in self.r.table(self.table_name).index_list().run(self.conn): + self.r.table(self.table_name).index_create('next_run_time').run(self.conn) - self.table = r.db(self.database).table(self.table) + self.table = self.r.db(self.database).table(self.table_name) def lookup_job(self, job_id): results = list(self.table.get_all(job_id).pluck('job_state').run(self.conn)) return self._reconstitute_job(results[0]['job_state']) if results else None def get_due_jobs(self, now): - return self._get_jobs(r.row['next_run_time'] <= datetime_to_utc_timestamp(now)) + return self._get_jobs(self.r.row['next_run_time'] <= datetime_to_utc_timestamp(now)) def get_next_run_time(self): results = list( self.table - .filter(r.row['next_run_time'] != None) # noqa - .order_by(r.asc('next_run_time')) + .filter(self.r.row['next_run_time'] != None) # noqa + .order_by(self.r.asc('next_run_time')) .map(lambda x: x['next_run_time']) .limit(1) .run(self.conn) @@ -92,7 +94,7 @@ class RethinkDBJobStore(BaseJobStore): job_dict = { 'id': job.id, 'next_run_time': datetime_to_utc_timestamp(job.next_run_time), - 'job_state': r.binary(pickle.dumps(job.__getstate__(), self.pickle_protocol)) + 'job_state': self.r.binary(pickle.dumps(job.__getstate__(), self.pickle_protocol)) } results = self.table.insert(job_dict).run(self.conn) if results['errors'] > 0: @@ -101,7 +103,7 @@ class RethinkDBJobStore(BaseJobStore): def update_job(self, job): changes = { 'next_run_time': datetime_to_utc_timestamp(job.next_run_time), - 'job_state': r.binary(pickle.dumps(job.__getstate__(), self.pickle_protocol)) + 'job_state': self.r.binary(pickle.dumps(job.__getstate__(), self.pickle_protocol)) } results = self.table.get_all(job.id).update(changes).run(self.conn) skipped = False in map(lambda x: results[x] == 0, results.keys()) @@ -130,8 +132,8 @@ class RethinkDBJobStore(BaseJobStore): def _get_jobs(self, predicate=None): jobs = [] failed_job_ids = [] - query = (self.table.filter(r.row['next_run_time'] != None).filter(predicate) if # noqa - predicate else self.table) + query = (self.table.filter(self.r.row['next_run_time'] != None).filter(predicate) # noqa + if predicate else self.table) query = query.order_by('next_run_time', 'id').pluck('id', 'job_state') for document in query.run(self.conn): @@ -143,7 +145,7 @@ class RethinkDBJobStore(BaseJobStore): # Remove all the jobs we failed to restore if failed_job_ids: - r.expr(failed_job_ids).for_each( + self.r.expr(failed_job_ids).for_each( lambda job_id: self.table.get_all(job_id).delete()).run(self.conn) return jobs diff --git a/docs/versionhistory.rst b/docs/versionhistory.rst index b9ffff1..01991e4 100644 --- a/docs/versionhistory.rst +++ b/docs/versionhistory.rst @@ -8,6 +8,7 @@ UNRELEASED ---------- * Adapted ``RedisJobStore`` to v3.0 of the ``redis`` library +* Adapted ``RethinkDBJobStore`` to v2.4 of the ``rethink`` library * Fixed ``DeprecationWarnings`` about ``collections.abc`` on Python 3.7 (PR by Roman Levin) 3.5.3 @@ -49,7 +49,7 @@ setup( 'gevent': ['gevent'], 'mongodb': ['pymongo >= 2.8'], 'redis': ['redis >= 3.0'], - 'rethinkdb': ['rethinkdb'], + 'rethinkdb': ['rethinkdb >= 2.4.0'], 'sqlalchemy': ['sqlalchemy >= 0.8'], 'tornado': ['tornado >= 4.3'], 'twisted': ['twisted'], diff --git a/tests/test_jobstores.py b/tests/test_jobstores.py index 2138a6b..de9bceb 100644 --- a/tests/test_jobstores.py +++ b/tests/test_jobstores.py @@ -49,7 +49,7 @@ def rethinkdbjobstore(): store = rethinkdb.RethinkDBJobStore(database='apscheduler_unittest') store.start(None, 'rethinkdb') yield store - rethinkdb.r.db_drop('apscheduler_unittest').run(store.conn) + store.r.db_drop('apscheduler_unittest').run(store.conn) store.shutdown() |