summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2014-10-01 23:46:28 +0000
committerGerrit Code Review <review@openstack.org>2014-10-01 23:46:28 +0000
commit75376428186d7794e900ce0ac4df80bfc70078f4 (patch)
treebf5fd89b53a46f863546b0569817e1d19c8f192c
parent70385ed28eb0185b4555f584762ecbcc0bba50c1 (diff)
parentf8d69ffc314cbab4c28a70db3c4f772c6af60fcb (diff)
downloadtaskflow-75376428186d7794e900ce0ac4df80bfc70078f4.tar.gz
Merge "Adjust on_job_posting to not hold the lock while investigating"
-rw-r--r--taskflow/jobs/backends/impl_zookeeper.py54
1 files changed, 33 insertions, 21 deletions
diff --git a/taskflow/jobs/backends/impl_zookeeper.py b/taskflow/jobs/backends/impl_zookeeper.py
index cc6101c..9630779 100644
--- a/taskflow/jobs/backends/impl_zookeeper.py
+++ b/taskflow/jobs/backends/impl_zookeeper.py
@@ -312,8 +312,12 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard):
def _emit(self, state, details):
# Submit the work to the executor to avoid blocking the kazoo queue.
- if self._worker is not None:
+ try:
self._worker.submit(self.notifier.notify, state, details)
+ except (AttributeError, RuntimeError):
+ # Notification thread is shutdown or non-existent, either case we
+ # just want to skip submitting a notification...
+ pass
@property
def path(self):
@@ -382,6 +386,9 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard):
else:
self._job_cond.acquire()
try:
+ # Now we can offically check if someone already placed this
+ # jobs information into the known job set (if it's already
+ # existing then just leave it alone).
if path not in self._known_jobs:
job = ZookeeperJob(job_data['name'], self,
self._client, self._persistence, path,
@@ -405,32 +412,37 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard):
continue
child_paths.append(k_paths.join(self.path, c))
- # Remove jobs that we know about but which are no longer children
+ # Figure out what we really should be investigating and what we
+ # shouldn't...
+ investigate_paths = []
with self._job_lock:
removals = set()
- for path, _job in six.iteritems(self._known_jobs):
+ for path in six.iterkeys(self._known_jobs):
if path not in child_paths:
removals.add(path)
for path in removals:
self._remove_job(path)
-
- # Ensure that we have a job record for each new job that has appeared
- for path in child_paths:
- if path in self._bad_paths:
- continue
- with self._job_lock:
- if path not in self._known_jobs:
- # Fire off the request to populate this job asynchronously.
- #
- # This method is *usually* called from a asynchronous
- # handler so it's better to exit from this quickly to
- # allow other asynchronous handlers to be executed.
- request = self._client.get_async(path)
- child_proc = functools.partial(self._process_child, path)
- if delayed:
- request.rawlink(child_proc)
- else:
- child_proc(request)
+ for path in child_paths:
+ if path in self._bad_paths:
+ continue
+ # This pre-check will not guarantee that we will not already
+ # have the job (if it's being populated elsewhere) but it will
+ # reduce the amount of duplicated requests in general.
+ if path in self._known_jobs:
+ continue
+ if path not in investigate_paths:
+ investigate_paths.append(path)
+ for path in investigate_paths:
+ # Fire off the request to populate this job.
+ #
+ # This method is *usually* called from a asynchronous handler so
+ # it's better to exit from this quickly to allow other asynchronous
+ # handlers to be executed.
+ request = self._client.get_async(path)
+ if delayed:
+ request.rawlink(functools.partial(self._process_child, path))
+ else:
+ self._process_child(path, request)
def post(self, name, book=None, details=None):