summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Krause <dan.krause@rackspace.com>2014-05-16 09:38:55 -0500
committerJoshua Harlow <harlowja@yahoo-inc.com>2014-05-28 12:08:06 -0700
commitc386a5f9d448d7318a3d89e709e20f6959ce9110 (patch)
treea63562bc23272a8953f2e8d0341291806f45e403
parent268d935a0a5adff7f0d59bdf3882210c49e45f5c (diff)
downloadtaskflow-c386a5f9d448d7318a3d89e709e20f6959ce9110.tar.gz
Adds a single threaded flow conductor
Creates a new conductor module that can be used to connect into the jobboard, engine, and persistence mechanism. This commit adds in support for a simple conductor that will run jobs in its own thread and will dispatch them, and consume/abandon them. Implements: blueprint generic-flow-conductor Change-Id: Ic610bc825506db57b0c4364b0fc588b51d453a76
-rw-r--r--taskflow/conductors/__init__.py0
-rw-r--r--taskflow/conductors/base.py90
-rw-r--r--taskflow/conductors/single_threaded.py150
-rw-r--r--taskflow/exceptions.py6
-rw-r--r--taskflow/jobs/job.py5
-rw-r--r--taskflow/jobs/jobboard.py4
-rw-r--r--taskflow/tests/unit/conductor/__init__.py0
-rw-r--r--taskflow/tests/unit/conductor/test_conductor.py153
8 files changed, 408 insertions, 0 deletions
diff --git a/taskflow/conductors/__init__.py b/taskflow/conductors/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/taskflow/conductors/__init__.py
diff --git a/taskflow/conductors/base.py b/taskflow/conductors/base.py
new file mode 100644
index 0000000..634c5de
--- /dev/null
+++ b/taskflow/conductors/base.py
@@ -0,0 +1,90 @@
+# -*- coding: utf-8 -*-
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import abc
+import threading
+
+import six
+
+import taskflow.engines
+from taskflow import exceptions as excp
+from taskflow.utils import lock_utils
+
+
+@six.add_metaclass(abc.ABCMeta)
+class Conductor(object):
+ """Conductors act as entities which extract jobs from a jobboard, assign
+ there work to some engine (using some desired configuration) and then wait
+ for that work to complete. If the work fails then they abandon the claimed
+ work (or if the process they are running in crashes or dies this
+ abandonment happens automatically) and then another conductor at a later
+ period of time will finish up the prior failed conductors work.
+ """
+
+ def __init__(self, name, jobboard, engine_conf, persistence):
+ self._name = name
+ self._jobboard = jobboard
+ self._engine_conf = engine_conf
+ self._persistence = persistence
+ self._lock = threading.RLock()
+
+ def _engine_from_job(self, job):
+ try:
+ flow_uuid = job.details["flow_uuid"]
+ except (KeyError, TypeError):
+ raise excp.NotFound("No flow detail uuid found in job")
+ else:
+ try:
+ flow_detail = job.book.find(flow_uuid)
+ except (TypeError, AttributeError):
+ flow_detail = None
+ if flow_detail is None:
+ raise excp.NotFound("No matching flow detail found in"
+ " job for flow detail uuid %s" % flow_uuid)
+ try:
+ store = dict(job.details["store"])
+ except (KeyError, TypeError):
+ store = {}
+ return taskflow.engines.load_from_detail(
+ flow_detail,
+ store=store,
+ engine_conf=dict(self._engine_conf),
+ backend=self._persistence)
+
+ @lock_utils.locked
+ def connect(self):
+ """Ensures the jobboard is connected (noop if it is already)."""
+ if not self._jobboard.connected:
+ self._jobboard.connect()
+
+ @lock_utils.locked
+ def close(self):
+ """Closes the jobboard, disallowing further use."""
+ self._jobboard.close()
+
+ @abc.abstractmethod
+ def run(self):
+ """Continuously claims, runs, and consumes jobs, and waits for more
+ jobs when there are none left on the jobboard.
+ """
+
+ @abc.abstractmethod
+ def _dispatch_job(self, job):
+ """Accepts a single (already claimed) job and causes it to be run in
+ an engine. The job is consumed upon completion (unless False is
+ returned which will signify the job should be abandoned instead)
+
+ :param job: A Job instance that has already been claimed by the
+ jobboard.
+ """
diff --git a/taskflow/conductors/single_threaded.py b/taskflow/conductors/single_threaded.py
new file mode 100644
index 0000000..8720110
--- /dev/null
+++ b/taskflow/conductors/single_threaded.py
@@ -0,0 +1,150 @@
+# -*- coding: utf-8 -*-
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import logging
+import threading
+
+import six
+
+from taskflow.conductors import base
+from taskflow import exceptions as excp
+from taskflow.listeners import logging as logging_listener
+from taskflow.utils import lock_utils
+from taskflow.utils import misc
+
+LOG = logging.getLogger(__name__)
+WAIT_TIMEOUT = 0.5
+NO_CONSUME_EXCEPTIONS = tuple([
+ excp.ExecutionFailure,
+ excp.StorageFailure,
+])
+
+
+class SingleThreadedConductor(base.Conductor):
+ """A conductor that runs jobs in its own dispatching loop.
+
+ This conductor iterates over jobs in the provided jobboard (waiting for
+ the given timeout if no jobs exist) and attempts to claim them, work on
+ those jobs in its local thread (blocking further work from being claimed
+ and consumed) and then consume those work units after completetion. This
+ process will repeat until the conductor has been stopped or other critical
+ error occurs.
+
+ NOTE(harlowja): consumption occurs even if a engine fails to run due to
+ a task failure. This is only skipped when an execution failure or
+ a storage failure occurs which are *usually* correctable by re-running on
+ a different conductor (storage failures and execution failures may be
+ transient issues that can be worked around by later execution). If a job
+ after completing can not be consumed or abandoned the conductor relies
+ upon the jobboard capabilities to automatically abandon these jobs.
+ """
+
+ def __init__(self, name, jobboard, engine_conf, persistence,
+ wait_timeout=None):
+ super(SingleThreadedConductor, self).__init__(name, jobboard,
+ engine_conf,
+ persistence)
+ if wait_timeout is None:
+ wait_timeout = WAIT_TIMEOUT
+ if isinstance(wait_timeout, (int, float) + six.string_types):
+ self._wait_timeout = misc.Timeout(float(wait_timeout))
+ elif isinstance(wait_timeout, misc.Timeout):
+ self._wait_timeout = wait_timeout
+ else:
+ raise ValueError("Invalid timeout literal: %s" % (wait_timeout))
+ self._dead = threading.Event()
+
+ @lock_utils.locked
+ def stop(self, timeout=None):
+ """Stops dispatching and returns whether the dispatcher loop is active
+ or whether it has ceased. If a timeout is provided the dispatcher
+ loop may not have ceased by the timeout reached (the request to cease
+ will be honored in the future).
+ """
+ self._wait_timeout.interrupt()
+ self._dead.wait(timeout)
+ return self.dispatching
+
+ @property
+ def dispatching(self):
+ if self._dead.is_set():
+ return False
+ return True
+
+ def _dispatch_job(self, job):
+ LOG.info("Dispatching job: %s", job)
+ try:
+ engine = self._engine_from_job(job)
+ except Exception as e:
+ raise excp.ConductorFailure("Failed creating an engine", cause=e)
+ with logging_listener.LoggingListener(engine, log=LOG):
+ consume = True
+ try:
+ engine.run()
+ except excp.WrappedFailure as e:
+ if all((f.check(*NO_CONSUME_EXCEPTIONS) for f in e)):
+ LOG.warn("Job execution failed (consumption being"
+ " skipped): %s", job, exc_info=True)
+ consume = False
+ else:
+ LOG.warn("Job execution failed: %s", job, exc_info=True)
+ except NO_CONSUME_EXCEPTIONS:
+ LOG.warn("Job execution failed (consumption being"
+ " skipped): %s", job, exc_info=True)
+ consume = False
+ except Exception:
+ LOG.warn("Job execution failed: %s", job, exc_info=True)
+ else:
+ LOG.info("Job completed successfully: %s", job)
+ return consume
+
+ def run(self):
+ self._dead.clear()
+ try:
+ while True:
+ if self._wait_timeout.is_stopped():
+ break
+ dispatched = 0
+ for job in self._jobboard.iterjobs():
+ if self._wait_timeout.is_stopped():
+ break
+ LOG.debug("Trying to claim job: %s", job)
+ try:
+ self._jobboard.claim(job, self._name)
+ except (excp.UnclaimableJob, excp.NotFound):
+ LOG.debug("Job already claimed or consumed: %s", job)
+ continue
+ dispatched += 1
+ try:
+ consume = self._dispatch_job(job)
+ except excp.ConductorFailure:
+ LOG.warn("Job dispatching failed: %s", job,
+ exc_info=True)
+ else:
+ try:
+ if consume:
+ self._jobboard.consume(job, self._name)
+ else:
+ self._jobboard.abandon(job, self._name)
+ except excp.JobFailure:
+ if consume:
+ LOG.warn("Failed job consumption: %s", job,
+ exc_info=True)
+ else:
+ LOG.warn("Failed job abandonment: %s", job,
+ exc_info=True)
+ if dispatched == 0 and not self._wait_timeout.is_stopped():
+ self._wait_timeout.wait()
+ finally:
+ self._dead.set()
diff --git a/taskflow/exceptions.py b/taskflow/exceptions.py
index 95e378a..f3d2109 100644
--- a/taskflow/exceptions.py
+++ b/taskflow/exceptions.py
@@ -61,6 +61,12 @@ class StorageFailure(TaskFlowException):
"""Raised when storage backends can not be read/saved/deleted."""
+# Conductor related errors.
+
+class ConductorFailure(TaskFlowException):
+ """Errors related to conducting activities."""
+
+
# Job related errors.
class JobFailure(TaskFlowException):
diff --git a/taskflow/jobs/job.py b/taskflow/jobs/job.py
index a4f0b41..796e5d1 100644
--- a/taskflow/jobs/job.py
+++ b/taskflow/jobs/job.py
@@ -99,3 +99,8 @@ class Job(object):
def name(self):
"""The non-uniquely identifying name of this job."""
return self._name
+
+ def __str__(self):
+ """Pretty formats the job into something *more* meaningful."""
+ return "%s %s (%s): %s" % (type(self).__name__,
+ self.name, self.uuid, self.details)
diff --git a/taskflow/jobs/jobboard.py b/taskflow/jobs/jobboard.py
index 40c1179..5857d55 100644
--- a/taskflow/jobs/jobboard.py
+++ b/taskflow/jobs/jobboard.py
@@ -154,6 +154,10 @@ class JobBoard(object):
this must be the same name that was used for claiming this job.
"""
+ @abc.abstractproperty
+ def connected(self):
+ """Returns if this jobboard is connected."""
+
@abc.abstractmethod
def connect(self):
"""Opens the connection to any backend system."""
diff --git a/taskflow/tests/unit/conductor/__init__.py b/taskflow/tests/unit/conductor/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/taskflow/tests/unit/conductor/__init__.py
diff --git a/taskflow/tests/unit/conductor/test_conductor.py b/taskflow/tests/unit/conductor/test_conductor.py
new file mode 100644
index 0000000..7ac75d9
--- /dev/null
+++ b/taskflow/tests/unit/conductor/test_conductor.py
@@ -0,0 +1,153 @@
+# -*- coding: utf-8 -*-
+
+# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import contextlib
+import threading
+
+from zake import fake_client
+
+from taskflow.conductors import single_threaded as stc
+from taskflow import engines
+from taskflow.jobs.backends import impl_zookeeper
+from taskflow.jobs import jobboard
+from taskflow.patterns import linear_flow as lf
+from taskflow.persistence.backends import impl_memory
+from taskflow import states as st
+from taskflow import test
+from taskflow.tests import utils as test_utils
+from taskflow.utils import misc
+from taskflow.utils import persistence_utils as pu
+
+
+@contextlib.contextmanager
+def close_many(*closeables):
+ try:
+ yield
+ finally:
+ for c in closeables:
+ c.close()
+
+
+def test_factory(blowup):
+ f = lf.Flow("test")
+ if not blowup:
+ f.add(test_utils.SaveOrderTask('test1'))
+ else:
+ f.add(test_utils.FailingTask("test1"))
+ return f
+
+
+def make_thread(conductor):
+ t = threading.Thread(target=conductor.run)
+ t.daemon = True
+ return t
+
+
+class SingleThreadedConductorTest(test_utils.EngineTestBase, test.TestCase):
+ def make_components(self, name='testing', wait_timeout=0.1):
+ client = fake_client.FakeClient()
+ persistence = impl_memory.MemoryBackend()
+ board = impl_zookeeper.ZookeeperJobBoard(name, {},
+ client=client,
+ persistence=persistence)
+ engine_conf = {
+ 'engine': 'default',
+ }
+ conductor = stc.SingleThreadedConductor(name, board, engine_conf,
+ persistence, wait_timeout)
+ return misc.AttrDict(board=board,
+ client=client,
+ persistence=persistence,
+ conductor=conductor)
+
+ def test_connection(self):
+ components = self.make_components()
+ components.conductor.connect()
+ with close_many(components.conductor, components.client):
+ self.assertTrue(components.board.connected)
+ self.assertTrue(components.client.connected)
+ self.assertFalse(components.board.connected)
+ self.assertFalse(components.client.connected)
+
+ def test_run_empty(self):
+ components = self.make_components()
+ components.conductor.connect()
+ with close_many(components.conductor, components.client):
+ t = make_thread(components.conductor)
+ t.start()
+ self.assertFalse(components.conductor.stop(0.5))
+ t.join()
+
+ def test_run(self):
+ components = self.make_components()
+ components.conductor.connect()
+ consumed_event = threading.Event()
+
+ def on_consume(state, details):
+ consumed_event.set()
+
+ components.board.notifier.register(jobboard.REMOVAL, on_consume)
+ with close_many(components.conductor, components.client):
+ t = make_thread(components.conductor)
+ t.start()
+ lb, fd = pu.temporary_flow_detail(components.persistence)
+ engines.save_factory_details(fd, test_factory,
+ [False], {},
+ backend=components.persistence)
+ components.board.post('poke', lb,
+ details={'flow_uuid': fd.uuid})
+ consumed_event.wait(1.0)
+ self.assertTrue(consumed_event.is_set())
+ components.conductor.stop(1.0)
+ self.assertFalse(components.conductor.dispatching)
+
+ persistence = components.persistence
+ with contextlib.closing(persistence.get_connection()) as conn:
+ lb = conn.get_logbook(lb.uuid)
+ fd = lb.find(fd.uuid)
+ self.assertIsNotNone(fd)
+ self.assertEqual(st.SUCCESS, fd.state)
+
+ def test_fail_run(self):
+ components = self.make_components()
+ components.conductor.connect()
+
+ consumed_event = threading.Event()
+
+ def on_consume(state, details):
+ consumed_event.set()
+
+ components.board.notifier.register(jobboard.REMOVAL, on_consume)
+ with close_many(components.conductor, components.client):
+ t = make_thread(components.conductor)
+ t.start()
+ lb, fd = pu.temporary_flow_detail(components.persistence)
+ engines.save_factory_details(fd, test_factory,
+ [True], {},
+ backend=components.persistence)
+ components.board.post('poke', lb,
+ details={'flow_uuid': fd.uuid})
+ consumed_event.wait(1.0)
+ self.assertTrue(consumed_event.is_set())
+ components.conductor.stop(1.0)
+ self.assertFalse(components.conductor.dispatching)
+
+ persistence = components.persistence
+ with contextlib.closing(persistence.get_connection()) as conn:
+ lb = conn.get_logbook(lb.uuid)
+ fd = lb.find(fd.uuid)
+ self.assertIsNotNone(fd)
+ self.assertEqual(st.REVERTED, fd.state)