diff options
author | Jenkins <jenkins@review.openstack.org> | 2016-07-25 16:37:39 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2016-07-25 16:37:39 +0000 |
commit | fac03addd2bd53a84e8be4e02173c1496de0affe (patch) | |
tree | 8505b081d72a379571978c9a09728bd80da03441 | |
parent | 81ef45bd86ed72f8d74cb867a3fed1f342c753c9 (diff) | |
parent | 35a9305f172ed8970caf1ff5cec261df7d3fe9ce (diff) | |
download | taskflow-fac03addd2bd53a84e8be4e02173c1496de0affe.tar.gz |
Merge "Ensure the fetching jobs does not fetch anything when in bad state"
-rw-r--r-- | taskflow/jobs/backends/impl_zookeeper.py | 74 |
1 files changed, 66 insertions, 8 deletions
diff --git a/taskflow/jobs/backends/impl_zookeeper.py b/taskflow/jobs/backends/impl_zookeeper.py index 6bdcbdd..5a446cb 100644 --- a/taskflow/jobs/backends/impl_zookeeper.py +++ b/taskflow/jobs/backends/impl_zookeeper.py @@ -14,6 +14,7 @@ # License for the specific language governing permissions and limitations # under the License. +import collections import contextlib import functools import sys @@ -23,6 +24,7 @@ import fasteners import futurist from kazoo import exceptions as k_exceptions from kazoo.protocol import paths as k_paths +from kazoo.protocol import states as k_states from kazoo.recipe import watchers from oslo_serialization import jsonutils from oslo_utils import excutils @@ -261,6 +263,19 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): #: Default znode path used for jobs (data, locks...). DEFAULT_PATH = "/taskflow/jobs" + STATE_HISTORY_LENGTH = 2 + """ + Number of prior state changes to keep a history of, mainly useful + for history tracking and debugging connectivity issues. + """ + + NO_FETCH_STATES = (k_states.KazooState.LOST, k_states.KazooState.SUSPENDED) + """ + Client states underwhich we return empty lists from fetching routines, + during these states the underlying connection either is being recovered + or may be recovered (aka, it has not full disconnected). + """ + def __init__(self, name, conf, client=None, persistence=None, emit_notifications=True): super(ZookeeperJobBoard, self).__init__(name, conf) @@ -298,6 +313,8 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): self._worker = None self._emit_notifications = bool(emit_notifications) self._connected = False + self._suspended = False + self._last_states = collections.deque(maxlen=self.STATE_HISTORY_LENGTH) def _try_emit(self, state, details): # Submit the work to the executor to avoid blocking the kazoo threads @@ -334,10 +351,25 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): return len(self._known_jobs) def _fetch_jobs(self, ensure_fresh=False): - if ensure_fresh: - self._force_refresh() - with self._job_cond: - return sorted(six.itervalues(self._known_jobs), reverse=True) + try: + last_state = self._last_states[0] + except IndexError: + last_state = None + if last_state in self.NO_FETCH_STATES: + # NOTE(harlowja): on lost clear out all known jobs (from the + # in-memory mapping) as we can not safely assume there are any + # jobs to continue working on in this state. + if last_state == k_states.KazooState.LOST and self._known_jobs: + # This will force the jobboard to drop all (in-memory) jobs + # that are not in this list (pretty much simulating what + # would happen if a jobboard data directory was emptied). + self._on_job_posting([], delayed=False) + return [] + else: + if ensure_fresh: + self._force_refresh() + with self._job_cond: + return sorted(six.itervalues(self._known_jobs)) def _force_refresh(self): try: @@ -364,12 +396,15 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): def _remove_job(self, path): if path not in self._known_jobs: - return + return False with self._job_cond: job = self._known_jobs.pop(path, None) if job is not None: LOG.debug("Removed job that was at path '%s'", path) self._try_emit(base.REMOVAL, details={'job': job}) + return True + else: + return False def _process_child(self, path, request, quiet=True): """Receives the result of a child data fetch request.""" @@ -456,8 +491,13 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): investigate_paths.append(path) if pending_removals: with self._job_cond: - for path in pending_removals: - self._remove_job(path) + am_removed = 0 + try: + for path in pending_removals: + am_removed += int(self._remove_job(path)) + finally: + if am_removed: + self._job_cond.notify_all() for path in investigate_paths: # Fire off the request to populate this job. # @@ -694,7 +734,24 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): kazoo_utils.checked_commit(txn) def _state_change_listener(self, state): - LOG.debug("Kazoo client has changed to state: %s", state) + if self._last_states: + LOG.debug("Kazoo client has changed to" + " state '%s' from prior states '%s'", state, + self._last_states) + else: + LOG.debug("Kazoo client has changed to state '%s' (from" + " its initial/uninitialized state)", state) + self._last_states.appendleft(state) + if state == k_states.KazooState.LOST: + self._connected = False + LOG.warn("Connection to zookeeper has been lost") + elif state == k_states.KazooState.SUSPENDED: + LOG.warn("Connection to zookeeper has been suspended") + self._suspended = True + else: + # Must be CONNECTED then (as there are only 3 enums) + if self._suspended: + self._suspended = False def wait(self, timeout=None): # Wait until timeout expires (or forever) for jobs to appear. @@ -738,6 +795,7 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): self._known_jobs.clear() LOG.debug("Stopped & cleared local state") self._connected = False + self._last_states.clear() @fasteners.locked(lock='_open_close_lock') def connect(self, timeout=10.0): |