summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2016-07-25 16:37:39 +0000
committerGerrit Code Review <review@openstack.org>2016-07-25 16:37:39 +0000
commitfac03addd2bd53a84e8be4e02173c1496de0affe (patch)
tree8505b081d72a379571978c9a09728bd80da03441
parent81ef45bd86ed72f8d74cb867a3fed1f342c753c9 (diff)
parent35a9305f172ed8970caf1ff5cec261df7d3fe9ce (diff)
downloadtaskflow-fac03addd2bd53a84e8be4e02173c1496de0affe.tar.gz
Merge "Ensure the fetching jobs does not fetch anything when in bad state"
-rw-r--r--taskflow/jobs/backends/impl_zookeeper.py74
1 files changed, 66 insertions, 8 deletions
diff --git a/taskflow/jobs/backends/impl_zookeeper.py b/taskflow/jobs/backends/impl_zookeeper.py
index 6bdcbdd..5a446cb 100644
--- a/taskflow/jobs/backends/impl_zookeeper.py
+++ b/taskflow/jobs/backends/impl_zookeeper.py
@@ -14,6 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.
+import collections
import contextlib
import functools
import sys
@@ -23,6 +24,7 @@ import fasteners
import futurist
from kazoo import exceptions as k_exceptions
from kazoo.protocol import paths as k_paths
+from kazoo.protocol import states as k_states
from kazoo.recipe import watchers
from oslo_serialization import jsonutils
from oslo_utils import excutils
@@ -261,6 +263,19 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
#: Default znode path used for jobs (data, locks...).
DEFAULT_PATH = "/taskflow/jobs"
+ STATE_HISTORY_LENGTH = 2
+ """
+ Number of prior state changes to keep a history of, mainly useful
+ for history tracking and debugging connectivity issues.
+ """
+
+ NO_FETCH_STATES = (k_states.KazooState.LOST, k_states.KazooState.SUSPENDED)
+ """
+ Client states underwhich we return empty lists from fetching routines,
+ during these states the underlying connection either is being recovered
+ or may be recovered (aka, it has not full disconnected).
+ """
+
def __init__(self, name, conf,
client=None, persistence=None, emit_notifications=True):
super(ZookeeperJobBoard, self).__init__(name, conf)
@@ -298,6 +313,8 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
self._worker = None
self._emit_notifications = bool(emit_notifications)
self._connected = False
+ self._suspended = False
+ self._last_states = collections.deque(maxlen=self.STATE_HISTORY_LENGTH)
def _try_emit(self, state, details):
# Submit the work to the executor to avoid blocking the kazoo threads
@@ -334,10 +351,25 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
return len(self._known_jobs)
def _fetch_jobs(self, ensure_fresh=False):
- if ensure_fresh:
- self._force_refresh()
- with self._job_cond:
- return sorted(six.itervalues(self._known_jobs), reverse=True)
+ try:
+ last_state = self._last_states[0]
+ except IndexError:
+ last_state = None
+ if last_state in self.NO_FETCH_STATES:
+ # NOTE(harlowja): on lost clear out all known jobs (from the
+ # in-memory mapping) as we can not safely assume there are any
+ # jobs to continue working on in this state.
+ if last_state == k_states.KazooState.LOST and self._known_jobs:
+ # This will force the jobboard to drop all (in-memory) jobs
+ # that are not in this list (pretty much simulating what
+ # would happen if a jobboard data directory was emptied).
+ self._on_job_posting([], delayed=False)
+ return []
+ else:
+ if ensure_fresh:
+ self._force_refresh()
+ with self._job_cond:
+ return sorted(six.itervalues(self._known_jobs))
def _force_refresh(self):
try:
@@ -364,12 +396,15 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
def _remove_job(self, path):
if path not in self._known_jobs:
- return
+ return False
with self._job_cond:
job = self._known_jobs.pop(path, None)
if job is not None:
LOG.debug("Removed job that was at path '%s'", path)
self._try_emit(base.REMOVAL, details={'job': job})
+ return True
+ else:
+ return False
def _process_child(self, path, request, quiet=True):
"""Receives the result of a child data fetch request."""
@@ -456,8 +491,13 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
investigate_paths.append(path)
if pending_removals:
with self._job_cond:
- for path in pending_removals:
- self._remove_job(path)
+ am_removed = 0
+ try:
+ for path in pending_removals:
+ am_removed += int(self._remove_job(path))
+ finally:
+ if am_removed:
+ self._job_cond.notify_all()
for path in investigate_paths:
# Fire off the request to populate this job.
#
@@ -694,7 +734,24 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
kazoo_utils.checked_commit(txn)
def _state_change_listener(self, state):
- LOG.debug("Kazoo client has changed to state: %s", state)
+ if self._last_states:
+ LOG.debug("Kazoo client has changed to"
+ " state '%s' from prior states '%s'", state,
+ self._last_states)
+ else:
+ LOG.debug("Kazoo client has changed to state '%s' (from"
+ " its initial/uninitialized state)", state)
+ self._last_states.appendleft(state)
+ if state == k_states.KazooState.LOST:
+ self._connected = False
+ LOG.warn("Connection to zookeeper has been lost")
+ elif state == k_states.KazooState.SUSPENDED:
+ LOG.warn("Connection to zookeeper has been suspended")
+ self._suspended = True
+ else:
+ # Must be CONNECTED then (as there are only 3 enums)
+ if self._suspended:
+ self._suspended = False
def wait(self, timeout=None):
# Wait until timeout expires (or forever) for jobs to appear.
@@ -738,6 +795,7 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
self._known_jobs.clear()
LOG.debug("Stopped & cleared local state")
self._connected = False
+ self._last_states.clear()
@fasteners.locked(lock='_open_close_lock')
def connect(self, timeout=10.0):