diff options
author | Jenkins <jenkins@review.openstack.org> | 2014-10-04 02:12:55 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2014-10-04 02:12:55 +0000 |
commit | 83271e26a583758c9765fa8670a9a4edc4e9cd7c (patch) | |
tree | 008839ec0d5b3b85fb3bde0facc203cff1b60cb3 | |
parent | 6e8e2355c29514fdba47483de513785400085b64 (diff) | |
parent | 15e3962e2a55a35fc2e87889f1a2085163c593ce (diff) | |
download | taskflow-83271e26a583758c9765fa8670a9a4edc4e9cd7c.tar.gz |
Merge "Jobboard example that show jobs + workers + producers"
-rw-r--r-- | doc/source/examples.rst | 12 | ||||
-rw-r--r-- | taskflow/examples/jobboard_produce_consume_colors.py | 178 | ||||
-rw-r--r-- | taskflow/jobs/backends/impl_zookeeper.py | 20 |
3 files changed, 207 insertions, 3 deletions
diff --git a/doc/source/examples.rst b/doc/source/examples.rst index 89731d8..f09fd67 100644 --- a/doc/source/examples.rst +++ b/doc/source/examples.rst @@ -198,3 +198,15 @@ Code :language: python :linenos: :lines: 16- + +Jobboard producer/consumer (simple) +=================================== + +.. note:: + + Full source located at :example:`jobboard_produce_consume_colors` + +.. literalinclude:: ../../taskflow/examples/jobboard_produce_consume_colors.py + :language: python + :linenos: + :lines: 16- diff --git a/taskflow/examples/jobboard_produce_consume_colors.py b/taskflow/examples/jobboard_produce_consume_colors.py new file mode 100644 index 0000000..7ff9265 --- /dev/null +++ b/taskflow/examples/jobboard_produce_consume_colors.py @@ -0,0 +1,178 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import collections +import contextlib +import logging +import os +import random +import sys +import threading +import time + +logging.basicConfig(level=logging.ERROR) + +top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), + os.pardir, + os.pardir)) +sys.path.insert(0, top_dir) + +import six +from zake import fake_client + +from taskflow import exceptions as excp +from taskflow.jobs import backends + +# In this example we show how a jobboard can be used to post work for other +# entities to work on. This example creates a set of jobs using one producer +# thread (typically this would be split across many machines) and then having +# other worker threads with there own jobboards select work using a given +# filters [red/blue] and then perform that work (and consuming or abandoning +# the job after it has been completed or failed). + +# Things to note: +# - No persistence layer is used (or logbook), just the job details are used +# to determine if a job should be selected by a worker or not. +# - This example runs in a single process (this is expected to be atypical +# but this example shows that it can be done if needed, for testing...) +# - The iterjobs(), claim(), consume()/abandon() worker workflow. +# - The post() producer workflow. + +SHARED_CONF = { + 'path': "/taskflow/jobs", + 'board': 'zookeeper', +} + +# How many workers and producers of work will be created (as threads). +PRODUCERS = 3 +WORKERS = 5 + +# How many units of work each producer will create. +PRODUCER_UNITS = 10 + +# How many units of work are expected to be produced (used so workers can +# know when to stop running and shutdown, typically this would not be a +# a value but we have to limit this examples execution time to be less than +# infinity). +EXPECTED_UNITS = PRODUCER_UNITS * PRODUCERS + +# Delay between producing/consuming more work. +WORKER_DELAY, PRODUCER_DELAY = (0.5, 0.5) + +# To ensure threads don't trample other threads output. +STDOUT_LOCK = threading.Lock() + + +def dispatch_work(job): + # This is where the jobs contained work *would* be done + time.sleep(1.0) + + +def safe_print(name, message, prefix=""): + with STDOUT_LOCK: + if prefix: + print("%s %s: %s" % (prefix, name, message)) + else: + print("%s: %s" % (name, message)) + + +def worker(ident, client, consumed): + # Create a personal board (using the same client so that it works in + # the same process) and start looking for jobs on the board that we want + # to perform. + name = "W-%s" % (ident) + safe_print(name, "started") + claimed_jobs = 0 + consumed_jobs = 0 + abandoned_jobs = 0 + with backends.backend(name, SHARED_CONF.copy(), client=client) as board: + while len(consumed) != EXPECTED_UNITS: + favorite_color = random.choice(['blue', 'red']) + for job in board.iterjobs(ensure_fresh=True, only_unclaimed=True): + # See if we should even bother with it... + if job.details.get('color') != favorite_color: + continue + safe_print(name, "'%s' [attempting claim]" % (job)) + try: + board.claim(job, name) + claimed_jobs += 1 + safe_print(name, "'%s' [claimed]" % (job)) + except (excp.NotFound, excp.UnclaimableJob): + safe_print(name, "'%s' [claim unsuccessful]" % (job)) + else: + try: + dispatch_work(job) + board.consume(job, name) + safe_print(name, "'%s' [consumed]" % (job)) + consumed_jobs += 1 + consumed.append(job) + except Exception: + board.abandon(job, name) + abandoned_jobs += 1 + safe_print(name, "'%s' [abandoned]" % (job)) + time.sleep(WORKER_DELAY) + safe_print(name, + "finished (claimed %s jobs, consumed %s jobs," + " abandoned %s jobs)" % (claimed_jobs, consumed_jobs, + abandoned_jobs), prefix=">>>") + + +def producer(ident, client): + # Create a personal board (using the same client so that it works in + # the same process) and start posting jobs on the board that we want + # some entity to perform. + name = "P-%s" % (ident) + safe_print(name, "started") + with backends.backend(name, SHARED_CONF.copy(), client=client) as board: + for i in six.moves.xrange(0, PRODUCER_UNITS): + job_name = "%s-%s" % (name, i) + details = { + 'color': random.choice(['red', 'blue']), + } + job = board.post(job_name, book=None, details=details) + safe_print(name, "'%s' [posted]" % (job)) + time.sleep(PRODUCER_DELAY) + safe_print(name, "finished", prefix=">>>") + + +def main(): + with contextlib.closing(fake_client.FakeClient()) as c: + created = [] + for i in range(0, PRODUCERS): + p = threading.Thread(target=producer, args=(i + 1, c)) + p.daemon = True + created.append(p) + p.start() + consumed = collections.deque() + for i in range(0, WORKERS): + w = threading.Thread(target=worker, args=(i + 1, c, consumed)) + w.daemon = True + created.append(w) + w.start() + while created: + t = created.pop() + t.join() + # At the end there should be nothing leftover, let's verify that. + board = backends.fetch('verifier', SHARED_CONF.copy(), client=c) + board.connect() + with contextlib.closing(board): + if board.job_count != 0 or len(consumed) != EXPECTED_UNITS: + return 1 + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/taskflow/jobs/backends/impl_zookeeper.py b/taskflow/jobs/backends/impl_zookeeper.py index 9630779..150daa4 100644 --- a/taskflow/jobs/backends/impl_zookeeper.py +++ b/taskflow/jobs/backends/impl_zookeeper.py @@ -77,10 +77,13 @@ class ZookeeperJob(base_job.Job): if all((self._book, self._book_data)): raise ValueError("Only one of 'book_data' or 'book'" " can be provided") - self._path = path + self._path = k_paths.normpath(path) self._lock_path = path + LOCK_POSTFIX self._created_on = created_on self._node_not_found = False + basename = k_paths.basename(self._path) + self._root = self._path[0:-len(basename)] + self._sequence = int(basename[len(JOB_PREFIX):]) @property def lock_path(self): @@ -90,6 +93,14 @@ class ZookeeperJob(base_job.Job): def path(self): return self._path + @property + def sequence(self): + return self._sequence + + @property + def root(self): + return self._root + def _get_node_attr(self, path, attr_name, trans_func=None): try: _data, node_stat = self._client.get(path) @@ -186,8 +197,11 @@ class ZookeeperJob(base_job.Job): return states.UNCLAIMED return states.CLAIMED - def __cmp__(self, other): - return cmp(self.path, other.path) + def __lt__(self, other): + if self.root == other.root: + return self.sequence < other.sequence + else: + return self.root < other.root def __hash__(self): return hash(self.path) |