From c5fff210535097ba169c26b1dbebe18762f14446 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Tue, 7 May 2013 10:49:44 -0700 Subject: Move the code over for now --- LICENSE | 176 ++++++++++++++++++++++++++++++++++ setup.py | 0 taskflow/__init__.py | 37 +++++++ taskflow/job.py | 59 ++++++++++++ taskflow/jobboard/__init__.py | 26 +++++ taskflow/jobboard/api.py | 95 ++++++++++++++++++ taskflow/jobboard/drivers/__init__.py | 17 ++++ taskflow/jobboard/drivers/db.py | 18 ++++ taskflow/jobboard/drivers/memory.py | 18 ++++ taskflow/jobboard/drivers/mq.py | 18 ++++ taskflow/jobboard/drivers/zk.py | 39 ++++++++ taskflow/locks/__init__.py | 26 +++++ taskflow/locks/api.py | 128 +++++++++++++++++++++++++ taskflow/locks/drivers/__init__.py | 17 ++++ taskflow/locks/drivers/db.py | 28 ++++++ taskflow/locks/drivers/file.py | 28 ++++++ taskflow/locks/drivers/memory.py | 28 ++++++ taskflow/locks/drivers/zk.py | 62 ++++++++++++ taskflow/logbook/__init__.py | 26 +++++ taskflow/logbook/api.py | 151 +++++++++++++++++++++++++++++ taskflow/logbook/drivers/db.py | 17 ++++ taskflow/logbook/drivers/memory.py | 17 ++++ taskflow/logbook/drivers/zk.py | 108 +++++++++++++++++++++ taskflow/patterns/__init__.py | 17 ++++ taskflow/patterns/linear_workflow.py | 139 +++++++++++++++++++++++++++ taskflow/reservation.py | 30 ++++++ taskflow/task.py | 45 +++++++++ 27 files changed, 1370 insertions(+) create mode 100644 LICENSE create mode 100644 setup.py create mode 100644 taskflow/__init__.py create mode 100644 taskflow/job.py create mode 100644 taskflow/jobboard/__init__.py create mode 100644 taskflow/jobboard/api.py create mode 100644 taskflow/jobboard/drivers/__init__.py create mode 100644 taskflow/jobboard/drivers/db.py create mode 100644 taskflow/jobboard/drivers/memory.py create mode 100644 taskflow/jobboard/drivers/mq.py create mode 100644 taskflow/jobboard/drivers/zk.py create mode 100644 taskflow/locks/__init__.py create mode 100644 taskflow/locks/api.py create mode 100644 taskflow/locks/drivers/__init__.py create mode 100644 taskflow/locks/drivers/db.py create mode 100644 taskflow/locks/drivers/file.py create mode 100644 taskflow/locks/drivers/memory.py create mode 100644 taskflow/locks/drivers/zk.py create mode 100644 taskflow/logbook/__init__.py create mode 100644 taskflow/logbook/api.py create mode 100644 taskflow/logbook/drivers/db.py create mode 100644 taskflow/logbook/drivers/memory.py create mode 100644 taskflow/logbook/drivers/zk.py create mode 100644 taskflow/patterns/__init__.py create mode 100644 taskflow/patterns/linear_workflow.py create mode 100644 taskflow/reservation.py create mode 100644 taskflow/task.py diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..c332405 --- /dev/null +++ b/LICENSE @@ -0,0 +1,176 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..e69de29 diff --git a/taskflow/__init__.py b/taskflow/__init__.py new file mode 100644 index 0000000..cd51019 --- /dev/null +++ b/taskflow/__init__.py @@ -0,0 +1,37 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +# Useful to know when other tasks are being activated and finishing. +STARTING = 'STARTING' +COMPLETED = 'COMPLETED' +ERRORED = 'ERRORED' + + +class Failure(object): + """When a task failure occurs the following object will be given to revert + and can be used to interrogate what caused the failure.""" + + def __init__(self, task, name, workflow, exception): + self.task = task + self.name = name + self.workflow = workflow + self.exception = exception + + + diff --git a/taskflow/job.py b/taskflow/job.py new file mode 100644 index 0000000..8873083 --- /dev/null +++ b/taskflow/job.py @@ -0,0 +1,59 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc + +CLAIMED = 'claimed' +UNCLAIMED = 'unclaimed' + + +class Job(object): + __metaclass__ = abc.ABCMeta + + def __init__(self, name, type, reservation): + self.name = name + # A link back to the reservation which + # can be used to query information about + # this job (and its subsequent + # workflows and tasks). + self.reservation = reservation + # TBD - likely more details about this job + self.details = None + self.state = UNCLAIMED + self.owner = None + + @abc.abstractproperty + def type(self): + # Returns which type of job this is. + # + # For example, a 'run_instance' job, or a 'delete_instance' job could + # be possible types. + raise NotImplementedError() + + @abc.abstractmethod + def claim(self, owner): + # This can be used to transition this job from unclaimed to claimed. + raise NotImplementedError() + + @abc.abstractmethod + def consume(self): + # This can be used to transition this job from active to finished. + # + # During said transition the job and any details of it may be removed + # from some backing storage (if applicable). + raise NotImplementedError() diff --git a/taskflow/jobboard/__init__.py b/taskflow/jobboard/__init__.py new file mode 100644 index 0000000..8ae8604 --- /dev/null +++ b/taskflow/jobboard/__init__.py @@ -0,0 +1,26 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +The jobboard service for Nova. Different implementations can be plugged +according to the Nova configuration. +""" + +from nova.workflow.jobboard import api + +API = api.API diff --git a/taskflow/jobboard/api.py b/taskflow/jobboard/api.py new file mode 100644 index 0000000..f5ff742 --- /dev/null +++ b/taskflow/jobboard/api.py @@ -0,0 +1,95 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc +import contextlib + +from oslo.config import cfg + +from nova.openstack.common import importutils +from nova.openstack.common import log as logging + +LOG = logging.getLogger(__name__) +jobboard_driver_opt = cfg.StrOpt('job_board_driver', + default='memory', + help='The driver that satisfies the ' + 'job providing service ( ' + 'valid options are: db, mq, zk, memory)') + +CONF = cfg.CONF +CONF.register_opts([jobboard_driver_opt]) + + +class API(object): + + _driver = None + _driver_name_class_mapping = { + # TBD + } + + def __new__(cls, *args, **kwargs): + '''Create an instance of the lock provider API. + + args and kwargs are passed down to the job board driver when it gets + created. + ''' + if not cls._driver: + LOG.debug(_('Job board driver defined as an instance of %s'), + str(CONF.job_board_driver)) + driver_name = CONF.job_board_driver + try: + driver_class = cls._driver_name_class_mapping[driver_name] + except KeyError: + raise TypeError(_("Unknown job board driver name: %s") + % driver_name) + cls._driver = importutils.import_object(driver_class, + *args, **kwargs) + utils.check_isinstance(cls._driver, JobBoardDriver) + return super(API, cls).__new__(cls) + + +class JobBoardDriver(object): + """Base class for job board drivers.""" + + __metaclass__ = abc.ABCMeta + + def __init__(self): + self._listeners = [] + + @abc.abstractmethod + def post(self, job): + raise NotImplementedError() + + def _notify_posted(self, job): + for i in self._listeners: + i.notify_posted(job) + + @abc.abstractmethod + def await(self, blocking=True): + raise NotImplementedError() + + def subscribe(self, listener): + self._listeners.append(listener) + + def unsubscribe(self, listener): + if listener in self._listeners: + self._listeners.remove(listener) + + def close(self): + """Allows the job board provider to free any resources that it has.""" + pass diff --git a/taskflow/jobboard/drivers/__init__.py b/taskflow/jobboard/drivers/__init__.py new file mode 100644 index 0000000..830dd2e --- /dev/null +++ b/taskflow/jobboard/drivers/__init__.py @@ -0,0 +1,17 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. diff --git a/taskflow/jobboard/drivers/db.py b/taskflow/jobboard/drivers/db.py new file mode 100644 index 0000000..27ef755 --- /dev/null +++ b/taskflow/jobboard/drivers/db.py @@ -0,0 +1,18 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + diff --git a/taskflow/jobboard/drivers/memory.py b/taskflow/jobboard/drivers/memory.py new file mode 100644 index 0000000..27ef755 --- /dev/null +++ b/taskflow/jobboard/drivers/memory.py @@ -0,0 +1,18 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + diff --git a/taskflow/jobboard/drivers/mq.py b/taskflow/jobboard/drivers/mq.py new file mode 100644 index 0000000..27ef755 --- /dev/null +++ b/taskflow/jobboard/drivers/mq.py @@ -0,0 +1,18 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + diff --git a/taskflow/jobboard/drivers/zk.py b/taskflow/jobboard/drivers/zk.py new file mode 100644 index 0000000..afb3466 --- /dev/null +++ b/taskflow/jobboard/drivers/zk.py @@ -0,0 +1,39 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from kazoo import client as kazoo_client + +from nova.workflow.jobboard import api + +from oslo.config import cfg + + +CONF = cfg.CONF +CONF.import_opt('address', 'nova.servicegroup.zk', group='zookeeper') +CONF.import_opt('recv_timeout', 'nova.servicegroup.zk', group='zookeeper') + + +class JobBoard(api.JobBoardDriver): + def __init__(self): + super(JobBoard, self).__init__() + self._client = kazoo_client.KazooClient(hosts=CONF.address, + timeout=CONF.recv_timeout) + self._client.start() + + def post(self, job): + diff --git a/taskflow/locks/__init__.py b/taskflow/locks/__init__.py new file mode 100644 index 0000000..f869bbc --- /dev/null +++ b/taskflow/locks/__init__.py @@ -0,0 +1,26 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +The lock provider service for Nova. Different implementations can be plugged +according to the Nova configuration. +""" + +from nova.workflow.locks import api + +API = api.API diff --git a/taskflow/locks/api.py b/taskflow/locks/api.py new file mode 100644 index 0000000..335767a --- /dev/null +++ b/taskflow/locks/api.py @@ -0,0 +1,128 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc +import contextlib + +from oslo.config import cfg + +from nova.openstack.common import importutils +from nova.openstack.common import log as logging +from nova import utils + + +LOG = logging.getLogger(__name__) +lockprovider_driver_opt = cfg.StrOpt('lock_provider_driver', + default='memory', + help='The driver that satisfies the ' + 'lock providing service (' + 'valid options are: db, file, ' + 'memory, zk)') + +CONF = cfg.CONF +CONF.register_opts([lockprovider_driver_opt]) + + +class API(object): + + _driver = None + _driver_name_class_mapping = { + # This can attempt to provide a distributed lock but extreme! care + # has to be taken to know when to expire a database lock, as well as + # extreme! care has to be taken when acquiring said lock. It is likely + # not possible to guarantee locking consistently and correctness when + # using a database to do locking. + 'db': 'nova.locks.drivers.db.LockProvider', + # This can provide per system level locks but can not provide across + # system level locks. + 'file': 'nova.locks.drivers.file.LockProvider', + # Note this driver can be used for distributed locking of resources. + 'zk': 'nova.locks.drivers.zk.LockProvider', + # This driver is pretty much only useful for testing since it can + # only provide per-process locking using greenlet/thread level locking. + 'memory': 'nova.locks.drivers.memory.LockProvider', + } + + def __new__(cls, *args, **kwargs): + '''Create an instance of the lock provider API. + + args and kwargs are passed down to the lock provider driver when it + gets created (if applicable). + ''' + if not cls._driver: + LOG.debug(_('Lock provider driver defined as an instance of %s'), + str(CONF.lock_provider_driver)) + driver_name = CONF.lock_provider_driver + try: + driver_class = cls._driver_name_class_mapping[driver_name] + except KeyError: + raise TypeError(_("Unknown lock provider driver name: %s") + % driver_name) + cls._driver = importutils.import_object(driver_class, + *args, **kwargs) + utils.check_isinstance(cls._driver, LockProvider) + return super(API, cls).__new__(cls) + + +class Lock(object): + """Base class for what a lock (distributed or local or in-between) should + provide""" + + __metaclass__ = abc.ABCMeta + + def __init__(self, resource_uri, blocking=True): + self.uri = resource_uri + self.blocking = blocking + + @abc.abstractmethod + def acquire(self): + raise NotImplementedError() + + @abc.abstractmethod + def release(self): + raise NotImplementedError() + + @abc.abstractmethod + def is_locked(self): + raise NotImplementedError() + + @abc.abstractmethod + def cancel(self): + raise NotImplementedError() + + def __enter__(self): + self.acquire() + + def __exit__(self, type, value, traceback): + self.release() + + +class LockProvider(object): + """Base class for lock provider drivers.""" + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def provide(self, resource_uri, blocking=True): + """Returns a single lock object which can be used to acquire the lock + on the given resource uri""" + raise NotImplementedError() + + def close(self): + """Allows the lock provider to free any resources that it has.""" + pass diff --git a/taskflow/locks/drivers/__init__.py b/taskflow/locks/drivers/__init__.py new file mode 100644 index 0000000..830dd2e --- /dev/null +++ b/taskflow/locks/drivers/__init__.py @@ -0,0 +1,17 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. diff --git a/taskflow/locks/drivers/db.py b/taskflow/locks/drivers/db.py new file mode 100644 index 0000000..214332b --- /dev/null +++ b/taskflow/locks/drivers/db.py @@ -0,0 +1,28 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from nova.workflow.locks import api + + +class Lock(api.Lock): + pass + + +class LockProvider(api.LockProvider): + def provide(self, resource_uri, blocking=True): + return Lock(resource_uri, blocking) diff --git a/taskflow/locks/drivers/file.py b/taskflow/locks/drivers/file.py new file mode 100644 index 0000000..214332b --- /dev/null +++ b/taskflow/locks/drivers/file.py @@ -0,0 +1,28 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from nova.workflow.locks import api + + +class Lock(api.Lock): + pass + + +class LockProvider(api.LockProvider): + def provide(self, resource_uri, blocking=True): + return Lock(resource_uri, blocking) diff --git a/taskflow/locks/drivers/memory.py b/taskflow/locks/drivers/memory.py new file mode 100644 index 0000000..214332b --- /dev/null +++ b/taskflow/locks/drivers/memory.py @@ -0,0 +1,28 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from nova.workflow.locks import api + + +class Lock(api.Lock): + pass + + +class LockProvider(api.LockProvider): + def provide(self, resource_uri, blocking=True): + return Lock(resource_uri, blocking) diff --git a/taskflow/locks/drivers/zk.py b/taskflow/locks/drivers/zk.py new file mode 100644 index 0000000..e4eaac2 --- /dev/null +++ b/taskflow/locks/drivers/zk.py @@ -0,0 +1,62 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from kazoo import client as kazoo_client +from kazoo.recipe import lock as kazoo_lock + +from nova.workflow.locks import api + +from oslo.config import cfg + + +CONF = cfg.CONF +CONF.import_opt('address', 'nova.servicegroup.zk', group='zookeeper') +CONF.import_opt('recv_timeout', 'nova.servicegroup.zk', group='zookeeper') + + +class Lock(api.Lock): + def __init__(self, resource_uri, blocking, client): + super(Lock, self).__init__(resource_uri, blocking) + self._client = client + self._lock = kazoo_lock.Lock(client, resource_uri) + + def acquire(self): + return self._lock.acquire(self._blocking) + + def is_locked(self): + return self._lock.is_acquired + + def release(self): + return self._lock.release() + + def cancel(self): + return self._lock.cancel() + + +class LockProvider(api.LockProvider): + def __init__(self, *args, **kwargs): + self._client = kazoo_client.KazooClient(hosts=CONF.address, + timeout=CONF.recv_timeout) + self._client.start() + + def provide(self, resource_uri, blocking=True): + return Lock(self._client, resource_uri, blocking) + + def close(self): + if self._client: + self._client.stop() diff --git a/taskflow/logbook/__init__.py b/taskflow/logbook/__init__.py new file mode 100644 index 0000000..ed33d8e --- /dev/null +++ b/taskflow/logbook/__init__.py @@ -0,0 +1,26 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +The logbook provider service for Nova. Different implementations can be plugged +according to the Nova configuration. +""" + +from nova.workflow.logbook import api + +API = api.API diff --git a/taskflow/logbook/api.py b/taskflow/logbook/api.py new file mode 100644 index 0000000..d130afa --- /dev/null +++ b/taskflow/logbook/api.py @@ -0,0 +1,151 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc + +from oslo.config import cfg + +"""Define APIs for the logbook providers.""" + +from nova.openstack.common import importutils +from nova.openstack.common import log as logging + +LOG = logging.getLogger(__name__) +logprovider_driver_opt = cfg.StrOpt('logbook_provider_driver', + default='memory', + help='The driver for that satisfies the ' + 'remote lock providing service ( ' + 'valid options are: db, memory, zk)') + +CONF = cfg.CONF +CONF.register_opts([logprovider_driver_opt]) + + +class API(object): + + _driver = None + _driver_name_class_mapping = { + # Note these drivers can be used for persistant logging. + 'zk': 'nova.logbook.drivers.zk.ZooKeeperProviderDriver', + 'db': 'nova.logbook.drivers.db.DBProviderDriver', + + # This driver is pretty much only useful for testing. + 'memory': 'nova.logbook.drivers.memory.MemoryProviderDriver', + } + + def __new__(cls, *args, **kwargs): + '''Create an instance of the logbook provider API. + + args and kwargs are passed down to the logbook provider driver when it + gets created. + ''' + if not cls._driver: + LOG.debug(_('Logbook provider driver defined as an instance of %s'), + str(CONF.logbook_provider_driver)) + driver_name = CONF.logbook_provider_driver + try: + driver_class = cls._driver_name_class_mapping[driver_name] + except KeyError: + raise TypeError(_("Unknown logbook provider driver name: %s") + % driver_name) + cls._driver = importutils.import_object(driver_class, + *args, **kwargs) + utils.check_isinstance(cls._driver, LogBookProviderDriver) + return super(API, cls).__new__(cls) + + +class RecordNotFound(Exception): + pass + + +class LogBook(object): + """Base class for what a logbook (distributed or local or in-between) + should provide""" + + __metaclass__ = abc.ABCMeta + + def __init__(self, resource_uri): + self.uri = resource_uri + + @abc.abstractmethod + def add_record(self, name, metadata=None): + """Atomically adds a new entry to the given logbook with the supplied + metadata (if any).""" + raise NotImplementedError() + + @abc.abstractmethod + def fetch_record(self, name): + """Fetchs a record with the given name and returns any metadata about + said record.""" + raise NotImplementedError() + + @abc.abstractmethod + def __contains__(self, name): + """Determines if any entry with the given name exists in this + logbook.""" + raise NotImplementedError() + + @abc.abstractmethod + def mark(self, name, metadata, merge_functor=None): + """Marks the given logbook entry (which must exist) with the given + metadata, if said entry already exists then the provided merge functor + or a default function, will be activated to merge the existing metadata + with the supplied metadata.""" + raise NotImplementedError() + + @abc.abstractmethod + def __iter__(self): + """Iterates over all names and metadata and provides back both of these + via a (name, metadata) tuple. The order will be in the same order that + they were added.""" + raise NotImplementedError() + + +class LogBookProvider(object): + """Base class for logbook provider drivers.""" + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def provide(self, resource_uri): + """Returns a new (if not existent) logbook object or used logbook + object (if already existent) which can be used to determine the actions + performed on a given resource uri""" + raise NotImplementedError() + + def close(self): + """Allows the log provider to free any resources that it has.""" + pass diff --git a/taskflow/logbook/drivers/db.py b/taskflow/logbook/drivers/db.py new file mode 100644 index 0000000..830dd2e --- /dev/null +++ b/taskflow/logbook/drivers/db.py @@ -0,0 +1,17 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. diff --git a/taskflow/logbook/drivers/memory.py b/taskflow/logbook/drivers/memory.py new file mode 100644 index 0000000..830dd2e --- /dev/null +++ b/taskflow/logbook/drivers/memory.py @@ -0,0 +1,17 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. diff --git a/taskflow/logbook/drivers/zk.py b/taskflow/logbook/drivers/zk.py new file mode 100644 index 0000000..b65dc38 --- /dev/null +++ b/taskflow/logbook/drivers/zk.py @@ -0,0 +1,108 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from kazoo import client as kazoo_client + +from nova.openstack.common import jsonutils +from nova.workflow.logbook import api + +from oslo.config import cfg + + +CONF = cfg.CONF +CONF.import_opt('address', 'nova.servicegroup.zk', group='zookeeper') +CONF.import_opt('recv_timeout', 'nova.servicegroup.zk', group='zookeeper') + + +class LogBook(api.LogBook): + prefix = "entry-" + + def __init__(self, client, resource_uri): + super(LogBook, self).__init__(self, "%s/logbook" % (resource_uri)) + self._client = client + self._paths_made = False + self._paths = [self.uri] + + def _ensure_paths(self): + if self._paths_made: + return + for p in self._paths: + self._client.ensure_path(p) + self._paths_made = True + + def add_record(self, name, metadata=None): + self._ensure_paths() + (path, value) = self._make_storage(name, metadata) + self._client.create(path, jsonutils.dumps(value), sequence=True) + + def _make_storage(self, name, metadata): + path = "{root}/{prefix}".format(root=self.uri, prefix=self.prefix) + value = { + 'name': name, + 'metadata': metadata, + } + return (path, value) + + def __iter__(self): + for c in self._client.get_children(self.uri + "/"): + if not c.startswith(self.prefix): + continue + (value, _zk) = self._client.get("%s/%s" % (self.uri, c)) + value = jsonutils.loads(value) + yield (value['name'], value['metadata']) + + def __contains__(self, name): + for (n, metadata) in self: + if name == n: + return True + return False + + def mark(self, name, metadata, merge_func=None): + if merge_func is None: + merge_func = lambda old,new : new + for c in self._client.get_children(self.uri + "/"): + if not c.startswith(self.prefix): + continue + (value, _zk) = self._client.get("%s/%s" % (self.uri, c)) + value = jsonutils.loads(value) + if value['name'] == name: + value['metadata'] = merge_func(value['metadata'], metadata) + self._client.set("%s/%s" % (self.uri, c), + jsonutils.dumps(value)) + return + raise api.RecordNotFound() + + def fetch_record(self, name): + for n, metadata in self: + if name == n: + return metadata + raise api.RecordNotFound() + + +class LogBookProvider(api.LogBookProvider): + def __init__(self, *args, **kwargs): + self._client = kazoo_client.KazooClient(hosts=CONF.address, + timeout=CONF.recv_timeout) + self._client.start() + + def close(self): + if self._client: + self._client.stop() + + def provide(self, resource_uri): + return LogBook(self._client, resource_uri) diff --git a/taskflow/patterns/__init__.py b/taskflow/patterns/__init__.py new file mode 100644 index 0000000..830dd2e --- /dev/null +++ b/taskflow/patterns/__init__.py @@ -0,0 +1,17 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. diff --git a/taskflow/patterns/linear_workflow.py b/taskflow/patterns/linear_workflow.py new file mode 100644 index 0000000..768b52e --- /dev/null +++ b/taskflow/patterns/linear_workflow.py @@ -0,0 +1,139 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import collections as dict_provider +import copy + +# OrderedDict is only in 2.7 or greater :-( +if not hasattr(dict_provider, 'OrderedDict'): + import ordereddict as dict_provider + +from nova import workflow +from nova.openstack.common import excutils +from nova.openstack.common import log as logging + +LOG = logging.getLogger(__name__) + + +class Workflow(object): + """A linear chain of independent tasks that can be applied as one unit or + rolled back as one unit.""" + + def __init__(self, name, tolerant=False, parents=None): + # The tasks which have been applied will be collected here so that they + # can be reverted in the correct order on failure. + self.reversions = [] + self.name = name + # If this chain can ignore individual task reversion failure then this + # should be set to true, instead of the default value of false. + self.tolerant = tolerant + # Ordered dicts are used so that we can nicely refer to the tasks by + # name and easily fetch there results but also allow for the running + # of said tasks to happen in a linear order. + self.tasks = dict_provider.OrderedDict() + self.results = dict_provider.OrderedDict() + # If this workflow has a parent workflow/s which need to be reverted if + # this workflow fails then please include them here to allow this child + # to call the parents... + self.parents = parents + # This should be a functor that returns whether a given task has + # already ran by returning the return value of the task or returning + # 'None' if the task has not ran. + # + # NOTE(harlowja): This allows for resumption by skipping tasks which + # have already occurred. The previous return value is needed due to + # the contract we have with tasks that they will be given the value + # they returned if reversion is triggered. + self.result_fetcher = None + # Any objects that want to listen when a task starts/stops/completes + # or errors should be registered here. This can be used to monitor + # progress and record tasks finishing (so that it becomes possible to + # store the result of a task in some persistent or semi-persistent + # storage backend). + self.listeners = [] + + def __setitem__(self, name, task): + self.tasks[name] = task + + def __getitem__(self, name): + return self.results[name] + + def run(self, context, *args, **kwargs): + for (name, task) in self.tasks.iteritems(): + try: + self._on_task_start(context, task, name) + # See if we have already ran this... + result = None + if self.result_fetcher: + result = self.result_fetcher(context, name, self) + if result is None: + result = task.apply(context, *args, **kwargs) + # Keep a pristine copy of the result in the results table + # so that if said result is altered by other further states + # the one here will not be. + self.results[name] = copy.deepcopy(result) + self._on_task_finish(context, task, name, result) + except Exception as ex: + with excutils.save_and_reraise_exception(): + try: + self._on_task_error(context, task, name) + except Exception: + LOG.exception(_("Dropping exception catched when" + " notifying about existing task" + " exception.")) + self.rollback(context, + workflow.Failure(task, name, self, ex)) + + def _on_task_error(self, context, task, name): + # Notify any listeners that the task has errored. + for i in self.listeners: + i.notify(context, workflow.ERRORED, self, task, name) + + def _on_task_start(self, context, task, name): + # Notify any listeners that we are about to start the given task. + for i in self.listeners: + i.notify(context, workflow.STARTING, self, task, name) + + def _on_task_finish(self, context, task, name, result): + # Notify any listeners that we are finishing the given task. + self.reversions.append((name, task)) + for i in self.listeners: + i.notify(context, workflow.COMPLETED, self, task, + name, result=result) + + def rollback(self, context, cause): + for (i, (name, task)) in enumerate(reversed(self.reversions)): + try: + task.revert(context, self.results[name], cause) + except Exception: + # Ex: WARN: Failed rolling back stage 1 (validate_request) of + # chain validation due to Y exception. + msg = _("Failed rolling back stage %(index)s (%(name)s)" + " of workflow %(workflow)s, due to inner exception.") + LOG.warn(msg % {'index': (i + 1), 'stage': name, + 'workflow': self.name}) + if not self.tolerant: + # NOTE(harlowja): LOG a msg AND re-raise the exception if + # the chain does not tolerate exceptions happening in the + # rollback method. + raise + if self.parents: + # Rollback any parents workflows if they exist... + for p in self.parents: + p.rollback(context, cause) + diff --git a/taskflow/reservation.py b/taskflow/reservation.py new file mode 100644 index 0000000..bb0a279 --- /dev/null +++ b/taskflow/reservation.py @@ -0,0 +1,30 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +class Reservation(object): + """This is an abstraction of a promise to complete some type of job which + can be returned to a user for later lookup on the progress of said + promise""" + + def __init__(self, for_who, id): + self.for_who = for_who + self.id = id + + def __str__(self): + return "Reservation: '%s' for '%s'" % (self.id, self.for_who) diff --git a/taskflow/task.py b/taskflow/task.py new file mode 100644 index 0000000..c5133f3 --- /dev/null +++ b/taskflow/task.py @@ -0,0 +1,45 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc + + +class Task(object): + """An abstraction that defines a potential piece of work that can be + applied and can be reverted to undo the work as a single unit. + """ + __metaclass__ = abc.ABCMeta + + def __str__(self): + return "Task: %s" % (self.__class__.__name__) + + @abc.abstractmethod + def apply(self, context, *args, **kwargs): + """Activate a given task which will perform some operation and return. + + This method can be used to apply some given context and given set + of args and kwargs to accomplish some goal. Note that the result + that is returned needs to be serializable so that it can be passed + back into this task if reverting is triggered.""" + raise NotImplementedError() + + def revert(self, context, result, cause): + """Revert this task using the given context, result that the apply + provided as well as any information which may have caused + said reversion.""" + pass -- cgit v1.2.1