diff options
author | Jenkins <jenkins@review.openstack.org> | 2014-10-01 23:46:28 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2014-10-01 23:46:28 +0000 |
commit | 75376428186d7794e900ce0ac4df80bfc70078f4 (patch) | |
tree | bf5fd89b53a46f863546b0569817e1d19c8f192c | |
parent | 70385ed28eb0185b4555f584762ecbcc0bba50c1 (diff) | |
parent | f8d69ffc314cbab4c28a70db3c4f772c6af60fcb (diff) | |
download | taskflow-75376428186d7794e900ce0ac4df80bfc70078f4.tar.gz |
Merge "Adjust on_job_posting to not hold the lock while investigating"
-rw-r--r-- | taskflow/jobs/backends/impl_zookeeper.py | 54 |
1 files changed, 33 insertions, 21 deletions
diff --git a/taskflow/jobs/backends/impl_zookeeper.py b/taskflow/jobs/backends/impl_zookeeper.py index cc6101c..9630779 100644 --- a/taskflow/jobs/backends/impl_zookeeper.py +++ b/taskflow/jobs/backends/impl_zookeeper.py @@ -312,8 +312,12 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard): def _emit(self, state, details): # Submit the work to the executor to avoid blocking the kazoo queue. - if self._worker is not None: + try: self._worker.submit(self.notifier.notify, state, details) + except (AttributeError, RuntimeError): + # Notification thread is shutdown or non-existent, either case we + # just want to skip submitting a notification... + pass @property def path(self): @@ -382,6 +386,9 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard): else: self._job_cond.acquire() try: + # Now we can offically check if someone already placed this + # jobs information into the known job set (if it's already + # existing then just leave it alone). if path not in self._known_jobs: job = ZookeeperJob(job_data['name'], self, self._client, self._persistence, path, @@ -405,32 +412,37 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard): continue child_paths.append(k_paths.join(self.path, c)) - # Remove jobs that we know about but which are no longer children + # Figure out what we really should be investigating and what we + # shouldn't... + investigate_paths = [] with self._job_lock: removals = set() - for path, _job in six.iteritems(self._known_jobs): + for path in six.iterkeys(self._known_jobs): if path not in child_paths: removals.add(path) for path in removals: self._remove_job(path) - - # Ensure that we have a job record for each new job that has appeared - for path in child_paths: - if path in self._bad_paths: - continue - with self._job_lock: - if path not in self._known_jobs: - # Fire off the request to populate this job asynchronously. - # - # This method is *usually* called from a asynchronous - # handler so it's better to exit from this quickly to - # allow other asynchronous handlers to be executed. - request = self._client.get_async(path) - child_proc = functools.partial(self._process_child, path) - if delayed: - request.rawlink(child_proc) - else: - child_proc(request) + for path in child_paths: + if path in self._bad_paths: + continue + # This pre-check will not guarantee that we will not already + # have the job (if it's being populated elsewhere) but it will + # reduce the amount of duplicated requests in general. + if path in self._known_jobs: + continue + if path not in investigate_paths: + investigate_paths.append(path) + for path in investigate_paths: + # Fire off the request to populate this job. + # + # This method is *usually* called from a asynchronous handler so + # it's better to exit from this quickly to allow other asynchronous + # handlers to be executed. + request = self._client.get_async(path) + if delayed: + request.rawlink(functools.partial(self._process_child, path)) + else: + self._process_child(path, request) def post(self, name, book=None, details=None): |