diff options
author | Joshua Harlow <harlowja@yahoo-inc.com> | 2014-07-16 19:53:45 -0700 |
---|---|---|
committer | Joshua Harlow <harlowja@gmail.com> | 2014-09-27 14:51:38 -0700 |
commit | f8d69ffc314cbab4c28a70db3c4f772c6af60fcb (patch) | |
tree | 8f3e675443bab5ad30a93bd2bd4ab2a48ec7f609 | |
parent | d43cc4f9c33a37d6a3a3f1a1eec7219a0804767b (diff) | |
download | taskflow-f8d69ffc314cbab4c28a70db3c4f772c6af60fcb.tar.gz |
Adjust on_job_posting to not hold the lock while investigating
To avoid issues when investigating jobs in one thread and having
a dispatcher thread get locked on that same investigation lock avoid
this entirely by not holding onto that lock.
This also adds a small cleanup around the notifier thread pool and
ensures that it actually exists in a useable state (and handles the
exceptions that arise when it doesn't) on submission.
Change-Id: I12e71f029726ec54ec0aadbdf71cc3c7a959f047
-rw-r--r-- | taskflow/jobs/backends/impl_zookeeper.py | 54 |
1 files changed, 33 insertions, 21 deletions
diff --git a/taskflow/jobs/backends/impl_zookeeper.py b/taskflow/jobs/backends/impl_zookeeper.py index cc6101c..9630779 100644 --- a/taskflow/jobs/backends/impl_zookeeper.py +++ b/taskflow/jobs/backends/impl_zookeeper.py @@ -312,8 +312,12 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard): def _emit(self, state, details): # Submit the work to the executor to avoid blocking the kazoo queue. - if self._worker is not None: + try: self._worker.submit(self.notifier.notify, state, details) + except (AttributeError, RuntimeError): + # Notification thread is shutdown or non-existent, either case we + # just want to skip submitting a notification... + pass @property def path(self): @@ -382,6 +386,9 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard): else: self._job_cond.acquire() try: + # Now we can offically check if someone already placed this + # jobs information into the known job set (if it's already + # existing then just leave it alone). if path not in self._known_jobs: job = ZookeeperJob(job_data['name'], self, self._client, self._persistence, path, @@ -405,32 +412,37 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard): continue child_paths.append(k_paths.join(self.path, c)) - # Remove jobs that we know about but which are no longer children + # Figure out what we really should be investigating and what we + # shouldn't... + investigate_paths = [] with self._job_lock: removals = set() - for path, _job in six.iteritems(self._known_jobs): + for path in six.iterkeys(self._known_jobs): if path not in child_paths: removals.add(path) for path in removals: self._remove_job(path) - - # Ensure that we have a job record for each new job that has appeared - for path in child_paths: - if path in self._bad_paths: - continue - with self._job_lock: - if path not in self._known_jobs: - # Fire off the request to populate this job asynchronously. - # - # This method is *usually* called from a asynchronous - # handler so it's better to exit from this quickly to - # allow other asynchronous handlers to be executed. - request = self._client.get_async(path) - child_proc = functools.partial(self._process_child, path) - if delayed: - request.rawlink(child_proc) - else: - child_proc(request) + for path in child_paths: + if path in self._bad_paths: + continue + # This pre-check will not guarantee that we will not already + # have the job (if it's being populated elsewhere) but it will + # reduce the amount of duplicated requests in general. + if path in self._known_jobs: + continue + if path not in investigate_paths: + investigate_paths.append(path) + for path in investigate_paths: + # Fire off the request to populate this job. + # + # This method is *usually* called from a asynchronous handler so + # it's better to exit from this quickly to allow other asynchronous + # handlers to be executed. + request = self._client.get_async(path) + if delayed: + request.rawlink(functools.partial(self._process_child, path)) + else: + self._process_child(path, request) def post(self, name, book=None, details=None): |