diff options
author | Dan Krause <dan.krause@rackspace.com> | 2014-05-16 09:38:55 -0500 |
---|---|---|
committer | Joshua Harlow <harlowja@yahoo-inc.com> | 2014-05-28 12:08:06 -0700 |
commit | c386a5f9d448d7318a3d89e709e20f6959ce9110 (patch) | |
tree | a63562bc23272a8953f2e8d0341291806f45e403 | |
parent | 268d935a0a5adff7f0d59bdf3882210c49e45f5c (diff) | |
download | taskflow-c386a5f9d448d7318a3d89e709e20f6959ce9110.tar.gz |
Adds a single threaded flow conductor
Creates a new conductor module that can be used
to connect into the jobboard, engine, and persistence
mechanism.
This commit adds in support for a simple conductor
that will run jobs in its own thread and will dispatch
them, and consume/abandon them.
Implements: blueprint generic-flow-conductor
Change-Id: Ic610bc825506db57b0c4364b0fc588b51d453a76
-rw-r--r-- | taskflow/conductors/__init__.py | 0 | ||||
-rw-r--r-- | taskflow/conductors/base.py | 90 | ||||
-rw-r--r-- | taskflow/conductors/single_threaded.py | 150 | ||||
-rw-r--r-- | taskflow/exceptions.py | 6 | ||||
-rw-r--r-- | taskflow/jobs/job.py | 5 | ||||
-rw-r--r-- | taskflow/jobs/jobboard.py | 4 | ||||
-rw-r--r-- | taskflow/tests/unit/conductor/__init__.py | 0 | ||||
-rw-r--r-- | taskflow/tests/unit/conductor/test_conductor.py | 153 |
8 files changed, 408 insertions, 0 deletions
diff --git a/taskflow/conductors/__init__.py b/taskflow/conductors/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/taskflow/conductors/__init__.py diff --git a/taskflow/conductors/base.py b/taskflow/conductors/base.py new file mode 100644 index 0000000..634c5de --- /dev/null +++ b/taskflow/conductors/base.py @@ -0,0 +1,90 @@ +# -*- coding: utf-8 -*- + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc +import threading + +import six + +import taskflow.engines +from taskflow import exceptions as excp +from taskflow.utils import lock_utils + + +@six.add_metaclass(abc.ABCMeta) +class Conductor(object): + """Conductors act as entities which extract jobs from a jobboard, assign + there work to some engine (using some desired configuration) and then wait + for that work to complete. If the work fails then they abandon the claimed + work (or if the process they are running in crashes or dies this + abandonment happens automatically) and then another conductor at a later + period of time will finish up the prior failed conductors work. + """ + + def __init__(self, name, jobboard, engine_conf, persistence): + self._name = name + self._jobboard = jobboard + self._engine_conf = engine_conf + self._persistence = persistence + self._lock = threading.RLock() + + def _engine_from_job(self, job): + try: + flow_uuid = job.details["flow_uuid"] + except (KeyError, TypeError): + raise excp.NotFound("No flow detail uuid found in job") + else: + try: + flow_detail = job.book.find(flow_uuid) + except (TypeError, AttributeError): + flow_detail = None + if flow_detail is None: + raise excp.NotFound("No matching flow detail found in" + " job for flow detail uuid %s" % flow_uuid) + try: + store = dict(job.details["store"]) + except (KeyError, TypeError): + store = {} + return taskflow.engines.load_from_detail( + flow_detail, + store=store, + engine_conf=dict(self._engine_conf), + backend=self._persistence) + + @lock_utils.locked + def connect(self): + """Ensures the jobboard is connected (noop if it is already).""" + if not self._jobboard.connected: + self._jobboard.connect() + + @lock_utils.locked + def close(self): + """Closes the jobboard, disallowing further use.""" + self._jobboard.close() + + @abc.abstractmethod + def run(self): + """Continuously claims, runs, and consumes jobs, and waits for more + jobs when there are none left on the jobboard. + """ + + @abc.abstractmethod + def _dispatch_job(self, job): + """Accepts a single (already claimed) job and causes it to be run in + an engine. The job is consumed upon completion (unless False is + returned which will signify the job should be abandoned instead) + + :param job: A Job instance that has already been claimed by the + jobboard. + """ diff --git a/taskflow/conductors/single_threaded.py b/taskflow/conductors/single_threaded.py new file mode 100644 index 0000000..8720110 --- /dev/null +++ b/taskflow/conductors/single_threaded.py @@ -0,0 +1,150 @@ +# -*- coding: utf-8 -*- + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging +import threading + +import six + +from taskflow.conductors import base +from taskflow import exceptions as excp +from taskflow.listeners import logging as logging_listener +from taskflow.utils import lock_utils +from taskflow.utils import misc + +LOG = logging.getLogger(__name__) +WAIT_TIMEOUT = 0.5 +NO_CONSUME_EXCEPTIONS = tuple([ + excp.ExecutionFailure, + excp.StorageFailure, +]) + + +class SingleThreadedConductor(base.Conductor): + """A conductor that runs jobs in its own dispatching loop. + + This conductor iterates over jobs in the provided jobboard (waiting for + the given timeout if no jobs exist) and attempts to claim them, work on + those jobs in its local thread (blocking further work from being claimed + and consumed) and then consume those work units after completetion. This + process will repeat until the conductor has been stopped or other critical + error occurs. + + NOTE(harlowja): consumption occurs even if a engine fails to run due to + a task failure. This is only skipped when an execution failure or + a storage failure occurs which are *usually* correctable by re-running on + a different conductor (storage failures and execution failures may be + transient issues that can be worked around by later execution). If a job + after completing can not be consumed or abandoned the conductor relies + upon the jobboard capabilities to automatically abandon these jobs. + """ + + def __init__(self, name, jobboard, engine_conf, persistence, + wait_timeout=None): + super(SingleThreadedConductor, self).__init__(name, jobboard, + engine_conf, + persistence) + if wait_timeout is None: + wait_timeout = WAIT_TIMEOUT + if isinstance(wait_timeout, (int, float) + six.string_types): + self._wait_timeout = misc.Timeout(float(wait_timeout)) + elif isinstance(wait_timeout, misc.Timeout): + self._wait_timeout = wait_timeout + else: + raise ValueError("Invalid timeout literal: %s" % (wait_timeout)) + self._dead = threading.Event() + + @lock_utils.locked + def stop(self, timeout=None): + """Stops dispatching and returns whether the dispatcher loop is active + or whether it has ceased. If a timeout is provided the dispatcher + loop may not have ceased by the timeout reached (the request to cease + will be honored in the future). + """ + self._wait_timeout.interrupt() + self._dead.wait(timeout) + return self.dispatching + + @property + def dispatching(self): + if self._dead.is_set(): + return False + return True + + def _dispatch_job(self, job): + LOG.info("Dispatching job: %s", job) + try: + engine = self._engine_from_job(job) + except Exception as e: + raise excp.ConductorFailure("Failed creating an engine", cause=e) + with logging_listener.LoggingListener(engine, log=LOG): + consume = True + try: + engine.run() + except excp.WrappedFailure as e: + if all((f.check(*NO_CONSUME_EXCEPTIONS) for f in e)): + LOG.warn("Job execution failed (consumption being" + " skipped): %s", job, exc_info=True) + consume = False + else: + LOG.warn("Job execution failed: %s", job, exc_info=True) + except NO_CONSUME_EXCEPTIONS: + LOG.warn("Job execution failed (consumption being" + " skipped): %s", job, exc_info=True) + consume = False + except Exception: + LOG.warn("Job execution failed: %s", job, exc_info=True) + else: + LOG.info("Job completed successfully: %s", job) + return consume + + def run(self): + self._dead.clear() + try: + while True: + if self._wait_timeout.is_stopped(): + break + dispatched = 0 + for job in self._jobboard.iterjobs(): + if self._wait_timeout.is_stopped(): + break + LOG.debug("Trying to claim job: %s", job) + try: + self._jobboard.claim(job, self._name) + except (excp.UnclaimableJob, excp.NotFound): + LOG.debug("Job already claimed or consumed: %s", job) + continue + dispatched += 1 + try: + consume = self._dispatch_job(job) + except excp.ConductorFailure: + LOG.warn("Job dispatching failed: %s", job, + exc_info=True) + else: + try: + if consume: + self._jobboard.consume(job, self._name) + else: + self._jobboard.abandon(job, self._name) + except excp.JobFailure: + if consume: + LOG.warn("Failed job consumption: %s", job, + exc_info=True) + else: + LOG.warn("Failed job abandonment: %s", job, + exc_info=True) + if dispatched == 0 and not self._wait_timeout.is_stopped(): + self._wait_timeout.wait() + finally: + self._dead.set() diff --git a/taskflow/exceptions.py b/taskflow/exceptions.py index 95e378a..f3d2109 100644 --- a/taskflow/exceptions.py +++ b/taskflow/exceptions.py @@ -61,6 +61,12 @@ class StorageFailure(TaskFlowException): """Raised when storage backends can not be read/saved/deleted.""" +# Conductor related errors. + +class ConductorFailure(TaskFlowException): + """Errors related to conducting activities.""" + + # Job related errors. class JobFailure(TaskFlowException): diff --git a/taskflow/jobs/job.py b/taskflow/jobs/job.py index a4f0b41..796e5d1 100644 --- a/taskflow/jobs/job.py +++ b/taskflow/jobs/job.py @@ -99,3 +99,8 @@ class Job(object): def name(self): """The non-uniquely identifying name of this job.""" return self._name + + def __str__(self): + """Pretty formats the job into something *more* meaningful.""" + return "%s %s (%s): %s" % (type(self).__name__, + self.name, self.uuid, self.details) diff --git a/taskflow/jobs/jobboard.py b/taskflow/jobs/jobboard.py index 40c1179..5857d55 100644 --- a/taskflow/jobs/jobboard.py +++ b/taskflow/jobs/jobboard.py @@ -154,6 +154,10 @@ class JobBoard(object): this must be the same name that was used for claiming this job. """ + @abc.abstractproperty + def connected(self): + """Returns if this jobboard is connected.""" + @abc.abstractmethod def connect(self): """Opens the connection to any backend system.""" diff --git a/taskflow/tests/unit/conductor/__init__.py b/taskflow/tests/unit/conductor/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/taskflow/tests/unit/conductor/__init__.py diff --git a/taskflow/tests/unit/conductor/test_conductor.py b/taskflow/tests/unit/conductor/test_conductor.py new file mode 100644 index 0000000..7ac75d9 --- /dev/null +++ b/taskflow/tests/unit/conductor/test_conductor.py @@ -0,0 +1,153 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import contextlib +import threading + +from zake import fake_client + +from taskflow.conductors import single_threaded as stc +from taskflow import engines +from taskflow.jobs.backends import impl_zookeeper +from taskflow.jobs import jobboard +from taskflow.patterns import linear_flow as lf +from taskflow.persistence.backends import impl_memory +from taskflow import states as st +from taskflow import test +from taskflow.tests import utils as test_utils +from taskflow.utils import misc +from taskflow.utils import persistence_utils as pu + + +@contextlib.contextmanager +def close_many(*closeables): + try: + yield + finally: + for c in closeables: + c.close() + + +def test_factory(blowup): + f = lf.Flow("test") + if not blowup: + f.add(test_utils.SaveOrderTask('test1')) + else: + f.add(test_utils.FailingTask("test1")) + return f + + +def make_thread(conductor): + t = threading.Thread(target=conductor.run) + t.daemon = True + return t + + +class SingleThreadedConductorTest(test_utils.EngineTestBase, test.TestCase): + def make_components(self, name='testing', wait_timeout=0.1): + client = fake_client.FakeClient() + persistence = impl_memory.MemoryBackend() + board = impl_zookeeper.ZookeeperJobBoard(name, {}, + client=client, + persistence=persistence) + engine_conf = { + 'engine': 'default', + } + conductor = stc.SingleThreadedConductor(name, board, engine_conf, + persistence, wait_timeout) + return misc.AttrDict(board=board, + client=client, + persistence=persistence, + conductor=conductor) + + def test_connection(self): + components = self.make_components() + components.conductor.connect() + with close_many(components.conductor, components.client): + self.assertTrue(components.board.connected) + self.assertTrue(components.client.connected) + self.assertFalse(components.board.connected) + self.assertFalse(components.client.connected) + + def test_run_empty(self): + components = self.make_components() + components.conductor.connect() + with close_many(components.conductor, components.client): + t = make_thread(components.conductor) + t.start() + self.assertFalse(components.conductor.stop(0.5)) + t.join() + + def test_run(self): + components = self.make_components() + components.conductor.connect() + consumed_event = threading.Event() + + def on_consume(state, details): + consumed_event.set() + + components.board.notifier.register(jobboard.REMOVAL, on_consume) + with close_many(components.conductor, components.client): + t = make_thread(components.conductor) + t.start() + lb, fd = pu.temporary_flow_detail(components.persistence) + engines.save_factory_details(fd, test_factory, + [False], {}, + backend=components.persistence) + components.board.post('poke', lb, + details={'flow_uuid': fd.uuid}) + consumed_event.wait(1.0) + self.assertTrue(consumed_event.is_set()) + components.conductor.stop(1.0) + self.assertFalse(components.conductor.dispatching) + + persistence = components.persistence + with contextlib.closing(persistence.get_connection()) as conn: + lb = conn.get_logbook(lb.uuid) + fd = lb.find(fd.uuid) + self.assertIsNotNone(fd) + self.assertEqual(st.SUCCESS, fd.state) + + def test_fail_run(self): + components = self.make_components() + components.conductor.connect() + + consumed_event = threading.Event() + + def on_consume(state, details): + consumed_event.set() + + components.board.notifier.register(jobboard.REMOVAL, on_consume) + with close_many(components.conductor, components.client): + t = make_thread(components.conductor) + t.start() + lb, fd = pu.temporary_flow_detail(components.persistence) + engines.save_factory_details(fd, test_factory, + [True], {}, + backend=components.persistence) + components.board.post('poke', lb, + details={'flow_uuid': fd.uuid}) + consumed_event.wait(1.0) + self.assertTrue(consumed_event.is_set()) + components.conductor.stop(1.0) + self.assertFalse(components.conductor.dispatching) + + persistence = components.persistence + with contextlib.closing(persistence.get_connection()) as conn: + lb = conn.get_logbook(lb.uuid) + fd = lb.find(fd.uuid) + self.assertIsNotNone(fd) + self.assertEqual(st.REVERTED, fd.state) |