summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoshua Harlow <harlowja@yahoo-inc.com>2014-07-16 19:53:45 -0700
committerJoshua Harlow <harlowja@gmail.com>2014-09-27 14:51:38 -0700
commit15e3962e2a55a35fc2e87889f1a2085163c593ce (patch)
tree7e150575690e2eb780c61130ee2901aac4956502
parentf8d69ffc314cbab4c28a70db3c4f772c6af60fcb (diff)
downloadtaskflow-15e3962e2a55a35fc2e87889f1a2085163c593ce.tar.gz
Jobboard example that show jobs + workers + producers
Add a new example that spins up a set of threads to simulate the actual workers and producers that would be normally attached to a jobboard and use these threads to producer and consume a set of simple jobs (that are filtered on by the workers). - This also fixes how python3 removed the __cmp__ operator which we were using for sorting the jobs that were posted and now we must use __lt__ instead. Fixes bug 1367496 Part of blueprint more-examples Change-Id: Ib8d116637b8edae31e4c8927a28515907855f8bf
-rw-r--r--doc/source/examples.rst12
-rw-r--r--taskflow/examples/jobboard_produce_consume_colors.py178
-rw-r--r--taskflow/jobs/backends/impl_zookeeper.py20
3 files changed, 207 insertions, 3 deletions
diff --git a/doc/source/examples.rst b/doc/source/examples.rst
index 89731d8..f09fd67 100644
--- a/doc/source/examples.rst
+++ b/doc/source/examples.rst
@@ -198,3 +198,15 @@ Code
:language: python
:linenos:
:lines: 16-
+
+Jobboard producer/consumer (simple)
+===================================
+
+.. note::
+
+ Full source located at :example:`jobboard_produce_consume_colors`
+
+.. literalinclude:: ../../taskflow/examples/jobboard_produce_consume_colors.py
+ :language: python
+ :linenos:
+ :lines: 16-
diff --git a/taskflow/examples/jobboard_produce_consume_colors.py b/taskflow/examples/jobboard_produce_consume_colors.py
new file mode 100644
index 0000000..7ff9265
--- /dev/null
+++ b/taskflow/examples/jobboard_produce_consume_colors.py
@@ -0,0 +1,178 @@
+# -*- coding: utf-8 -*-
+
+# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import collections
+import contextlib
+import logging
+import os
+import random
+import sys
+import threading
+import time
+
+logging.basicConfig(level=logging.ERROR)
+
+top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
+ os.pardir,
+ os.pardir))
+sys.path.insert(0, top_dir)
+
+import six
+from zake import fake_client
+
+from taskflow import exceptions as excp
+from taskflow.jobs import backends
+
+# In this example we show how a jobboard can be used to post work for other
+# entities to work on. This example creates a set of jobs using one producer
+# thread (typically this would be split across many machines) and then having
+# other worker threads with there own jobboards select work using a given
+# filters [red/blue] and then perform that work (and consuming or abandoning
+# the job after it has been completed or failed).
+
+# Things to note:
+# - No persistence layer is used (or logbook), just the job details are used
+# to determine if a job should be selected by a worker or not.
+# - This example runs in a single process (this is expected to be atypical
+# but this example shows that it can be done if needed, for testing...)
+# - The iterjobs(), claim(), consume()/abandon() worker workflow.
+# - The post() producer workflow.
+
+SHARED_CONF = {
+ 'path': "/taskflow/jobs",
+ 'board': 'zookeeper',
+}
+
+# How many workers and producers of work will be created (as threads).
+PRODUCERS = 3
+WORKERS = 5
+
+# How many units of work each producer will create.
+PRODUCER_UNITS = 10
+
+# How many units of work are expected to be produced (used so workers can
+# know when to stop running and shutdown, typically this would not be a
+# a value but we have to limit this examples execution time to be less than
+# infinity).
+EXPECTED_UNITS = PRODUCER_UNITS * PRODUCERS
+
+# Delay between producing/consuming more work.
+WORKER_DELAY, PRODUCER_DELAY = (0.5, 0.5)
+
+# To ensure threads don't trample other threads output.
+STDOUT_LOCK = threading.Lock()
+
+
+def dispatch_work(job):
+ # This is where the jobs contained work *would* be done
+ time.sleep(1.0)
+
+
+def safe_print(name, message, prefix=""):
+ with STDOUT_LOCK:
+ if prefix:
+ print("%s %s: %s" % (prefix, name, message))
+ else:
+ print("%s: %s" % (name, message))
+
+
+def worker(ident, client, consumed):
+ # Create a personal board (using the same client so that it works in
+ # the same process) and start looking for jobs on the board that we want
+ # to perform.
+ name = "W-%s" % (ident)
+ safe_print(name, "started")
+ claimed_jobs = 0
+ consumed_jobs = 0
+ abandoned_jobs = 0
+ with backends.backend(name, SHARED_CONF.copy(), client=client) as board:
+ while len(consumed) != EXPECTED_UNITS:
+ favorite_color = random.choice(['blue', 'red'])
+ for job in board.iterjobs(ensure_fresh=True, only_unclaimed=True):
+ # See if we should even bother with it...
+ if job.details.get('color') != favorite_color:
+ continue
+ safe_print(name, "'%s' [attempting claim]" % (job))
+ try:
+ board.claim(job, name)
+ claimed_jobs += 1
+ safe_print(name, "'%s' [claimed]" % (job))
+ except (excp.NotFound, excp.UnclaimableJob):
+ safe_print(name, "'%s' [claim unsuccessful]" % (job))
+ else:
+ try:
+ dispatch_work(job)
+ board.consume(job, name)
+ safe_print(name, "'%s' [consumed]" % (job))
+ consumed_jobs += 1
+ consumed.append(job)
+ except Exception:
+ board.abandon(job, name)
+ abandoned_jobs += 1
+ safe_print(name, "'%s' [abandoned]" % (job))
+ time.sleep(WORKER_DELAY)
+ safe_print(name,
+ "finished (claimed %s jobs, consumed %s jobs,"
+ " abandoned %s jobs)" % (claimed_jobs, consumed_jobs,
+ abandoned_jobs), prefix=">>>")
+
+
+def producer(ident, client):
+ # Create a personal board (using the same client so that it works in
+ # the same process) and start posting jobs on the board that we want
+ # some entity to perform.
+ name = "P-%s" % (ident)
+ safe_print(name, "started")
+ with backends.backend(name, SHARED_CONF.copy(), client=client) as board:
+ for i in six.moves.xrange(0, PRODUCER_UNITS):
+ job_name = "%s-%s" % (name, i)
+ details = {
+ 'color': random.choice(['red', 'blue']),
+ }
+ job = board.post(job_name, book=None, details=details)
+ safe_print(name, "'%s' [posted]" % (job))
+ time.sleep(PRODUCER_DELAY)
+ safe_print(name, "finished", prefix=">>>")
+
+
+def main():
+ with contextlib.closing(fake_client.FakeClient()) as c:
+ created = []
+ for i in range(0, PRODUCERS):
+ p = threading.Thread(target=producer, args=(i + 1, c))
+ p.daemon = True
+ created.append(p)
+ p.start()
+ consumed = collections.deque()
+ for i in range(0, WORKERS):
+ w = threading.Thread(target=worker, args=(i + 1, c, consumed))
+ w.daemon = True
+ created.append(w)
+ w.start()
+ while created:
+ t = created.pop()
+ t.join()
+ # At the end there should be nothing leftover, let's verify that.
+ board = backends.fetch('verifier', SHARED_CONF.copy(), client=c)
+ board.connect()
+ with contextlib.closing(board):
+ if board.job_count != 0 or len(consumed) != EXPECTED_UNITS:
+ return 1
+ return 0
+
+
+if __name__ == "__main__":
+ sys.exit(main())
diff --git a/taskflow/jobs/backends/impl_zookeeper.py b/taskflow/jobs/backends/impl_zookeeper.py
index 9630779..150daa4 100644
--- a/taskflow/jobs/backends/impl_zookeeper.py
+++ b/taskflow/jobs/backends/impl_zookeeper.py
@@ -77,10 +77,13 @@ class ZookeeperJob(base_job.Job):
if all((self._book, self._book_data)):
raise ValueError("Only one of 'book_data' or 'book'"
" can be provided")
- self._path = path
+ self._path = k_paths.normpath(path)
self._lock_path = path + LOCK_POSTFIX
self._created_on = created_on
self._node_not_found = False
+ basename = k_paths.basename(self._path)
+ self._root = self._path[0:-len(basename)]
+ self._sequence = int(basename[len(JOB_PREFIX):])
@property
def lock_path(self):
@@ -90,6 +93,14 @@ class ZookeeperJob(base_job.Job):
def path(self):
return self._path
+ @property
+ def sequence(self):
+ return self._sequence
+
+ @property
+ def root(self):
+ return self._root
+
def _get_node_attr(self, path, attr_name, trans_func=None):
try:
_data, node_stat = self._client.get(path)
@@ -186,8 +197,11 @@ class ZookeeperJob(base_job.Job):
return states.UNCLAIMED
return states.CLAIMED
- def __cmp__(self, other):
- return cmp(self.path, other.path)
+ def __lt__(self, other):
+ if self.root == other.root:
+ return self.sequence < other.sequence
+ else:
+ return self.root < other.root
def __hash__(self):
return hash(self.path)