summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoshua Harlow <harlowja@yahoo-inc.com>2014-09-22 21:20:15 -0700
committerJoshua Harlow <harlowja@yahoo-inc.com>2014-09-23 15:16:13 -0700
commitdbf117aaf5eaf59a8b5d3c0516add2b708ce3e73 (patch)
tree674b81068b6c5f7cf09bc2e99a334118f6836c24
parent2a8dbc54f6025818ce9f6792fd7eeff356924b76 (diff)
downloadtaskflow-dbf117aaf5eaf59a8b5d3c0516add2b708ce3e73.tar.gz
Documentation cleanups and tweaks
Apply some adjustments on the docs by rewording certain statements, linking to the correct classes and methods and linking to the correct exceptions to make it easier for users to follow the docs and there associated types and links. Change-Id: I03aac77c814fc4c376003f09a45559a0995b3c6c
-rw-r--r--doc/source/arguments_and_results.rst180
-rw-r--r--doc/source/atoms.rst2
-rw-r--r--doc/source/engines.rst58
-rw-r--r--doc/source/inputs_and_outputs.rst27
-rw-r--r--doc/source/notifications.rst27
-rw-r--r--doc/source/types.rst10
-rw-r--r--taskflow/engines/worker_based/worker.py38
-rw-r--r--taskflow/utils/misc.py31
8 files changed, 200 insertions, 173 deletions
diff --git a/doc/source/arguments_and_results.rst b/doc/source/arguments_and_results.rst
index e23a637..12ad6ed 100644
--- a/doc/source/arguments_and_results.rst
+++ b/doc/source/arguments_and_results.rst
@@ -1,32 +1,35 @@
-==========================
-Atom Arguments and Results
-==========================
+=====================
+Arguments and results
+=====================
.. |task.execute| replace:: :py:meth:`~taskflow.task.BaseTask.execute`
.. |task.revert| replace:: :py:meth:`~taskflow.task.BaseTask.revert`
.. |retry.execute| replace:: :py:meth:`~taskflow.retry.Retry.execute`
.. |retry.revert| replace:: :py:meth:`~taskflow.retry.Retry.revert`
-
-In TaskFlow, all flow and task state goes to (potentially persistent) storage.
-That includes all the information that :doc:`atoms <atoms>` (e.g. tasks) in the
-flow need when they are executed, and all the information task produces (via
-serializable task results). A developer who implements tasks or flows can
-specify what arguments a task accepts and what result it returns in several
-ways. This document will help you understand what those ways are and how to use
-those ways to accomplish your desired usage pattern.
+.. |Retry| replace:: :py:class:`~taskflow.retry.Retry`
+.. |Task| replace:: :py:class:`Task <taskflow.task.BaseTask>`
+
+In TaskFlow, all flow and task state goes to (potentially persistent) storage
+(see :doc:`persistence <persistence>` for more details). That includes all the
+information that :doc:`atoms <atoms>` (e.g. tasks, retry objects...) in the
+workflow need when they are executed, and all the information task/retry
+produces (via serializable results). A developer who implements tasks/retries
+or flows can specify what arguments a task/retry accepts and what result it
+returns in several ways. This document will help you understand what those ways
+are and how to use those ways to accomplish your desired usage pattern.
.. glossary::
- Task arguments
- Set of names of task arguments available as the ``requires``
- property of the task instance. When a task is about to be executed
- values with these names are retrieved from storage and passed to
- |task.execute| method of the task.
+ Task/retry arguments
+ Set of names of task/retry arguments available as the ``requires``
+ property of the task/retry instance. When a task or retry object is
+ about to be executed values with these names are retrieved from storage
+ and passed to the ``execute`` method of the task/retry.
- Task results
- Set of names of task results (what task provides) available as
- ``provides`` property of task instance. After a task finishes
- successfully, its result(s) (what the task |task.execute| method
+ Task/retry results
+ Set of names of task/retry results (what task/retry provides) available
+ as ``provides`` property of task or retry instance. After a task/retry
+ finishes successfully, its result(s) (what the ``execute`` method
returns) are available by these names from storage (see examples
below).
@@ -44,8 +47,8 @@ There are different ways to specify the task argument ``requires`` set.
Arguments inference
-------------------
-Task arguments can be inferred from arguments of the |task.execute| method of
-the task.
+Task/retry arguments can be inferred from arguments of the |task.execute|
+method of a task (or the |retry.execute| of a retry object).
.. doctest::
@@ -56,10 +59,10 @@ the task.
>>> sorted(MyTask().requires)
['eggs', 'spam']
-Inference from the method signature is the ''simplest'' way to specify task
+Inference from the method signature is the ''simplest'' way to specify
arguments. Optional arguments (with default values), and special arguments like
-``self``, ``*args`` and ``**kwargs`` are ignored on inference (as these names
-have special meaning/usage in python).
+``self``, ``*args`` and ``**kwargs`` are ignored during inference (as these
+names have special meaning/usage in python).
.. doctest::
@@ -83,14 +86,14 @@ have special meaning/usage in python).
Rebinding
---------
-**Why:** There are cases when the value you want to pass to a task is stored
-with a name other then the corresponding task arguments name. That's when the
-``rebind`` task constructor parameter comes in handy. Using it the flow author
+**Why:** There are cases when the value you want to pass to a task/retry is
+stored with a name other then the corresponding arguments name. That's when the
+``rebind`` constructor parameter comes in handy. Using it the flow author
can instruct the engine to fetch a value from storage by one name, but pass it
-to a tasks |task.execute| method with another name. There are two possible ways
-of accomplishing this.
+to a tasks/retrys ``execute`` method with another name. There are two possible
+ways of accomplishing this.
-The first is to pass a dictionary that maps the task argument name to the name
+The first is to pass a dictionary that maps the argument name to the name
of a saved value.
For example, if you have task::
@@ -100,24 +103,25 @@ For example, if you have task::
def execute(self, vm_name, vm_image_id, **kwargs):
pass # TODO(imelnikov): use parameters to spawn vm
-and you saved 'vm_name' with 'name' key in storage, you can spawn a vm with
-such 'name' like this::
+and you saved ``'vm_name'`` with ``'name'`` key in storage, you can spawn a vm
+with such ``'name'`` like this::
SpawnVMTask(rebind={'vm_name': 'name'})
The second way is to pass a tuple/list/dict of argument names. The length of
-the tuple/list/dict should not be less then number of task required parameters.
+the tuple/list/dict should not be less then number of required parameters.
+
For example, you can achieve the same effect as the previous example with::
SpawnVMTask(rebind_args=('name', 'vm_image_id'))
-which is equivalent to a more elaborate::
+This is equivalent to a more elaborate::
SpawnVMTask(rebind=dict(vm_name='name',
vm_image_id='vm_image_id'))
-In both cases, if your task accepts arbitrary arguments with ``**kwargs``
-construct, you can specify extra arguments.
+In both cases, if your task (or retry) accepts arbitrary arguments
+with the ``**kwargs`` construct, you can specify extra arguments.
::
@@ -158,7 +162,8 @@ arguments) will appear in the ``kwargs`` of the |task.execute| method.
When constructing a task instance the flow author can also add more
requirements if desired. Those manual requirements (if they are not functional
-arguments) will appear in the ``**kwargs`` the |task.execute| method.
+arguments) will appear in the ``kwargs`` parameter of the |task.execute|
+method.
.. doctest::
@@ -189,12 +194,13 @@ avoid invalid argument mappings.
Results specification
=====================
-In python, function results are not named, so we can not infer what a task
-returns. This is important since the complete task result (what the
-|task.execute| method returns) is saved in (potentially persistent) storage,
-and it is typically (but not always) desirable to make those results accessible
-to other tasks. To accomplish this the task specifies names of those values via
-its ``provides`` task constructor parameter or other method (see below).
+In python, function results are not named, so we can not infer what a
+task/retry returns. This is important since the complete result (what the
+task |task.execute| or retry |retry.execute| method returns) is saved
+in (potentially persistent) storage, and it is typically (but not always)
+desirable to make those results accessible to others. To accomplish this
+the task/retry specifies names of those values via its ``provides`` constructor
+parameter or by its default provides attribute.
Returning one value
-------------------
@@ -242,14 +248,14 @@ tasks) will be able to get those elements from storage by name:
Provides argument can be shorter then the actual tuple returned by a task --
then extra values are ignored (but, as expected, **all** those values are saved
-and passed to the |task.revert| method).
+and passed to the task |task.revert| or retry |retry.revert| method).
.. note::
Provides arguments tuple can also be longer then the actual tuple returned
by task -- when this happens the extra parameters are left undefined: a
warning is printed to logs and if use of such parameter is attempted a
- ``NotFound`` exception is raised.
+ :py:class:`~taskflow.exceptions.NotFound` exception is raised.
Returning a dictionary
----------------------
@@ -290,16 +296,17 @@ will be able to get elements from storage by name:
and passed to the |task.revert| method). If the provides argument has some
items not present in the actual dict returned by the task -- then extra
parameters are left undefined: a warning is printed to logs and if use of
- such parameter is attempted a ``NotFound`` exception is raised.
+ such parameter is attempted a :py:class:`~taskflow.exceptions.NotFound`
+ exception is raised.
Default provides
----------------
-As mentioned above, the default task base class provides nothing, which means
-task results are not accessible to other tasks in the flow.
+As mentioned above, the default base class provides nothing, which means
+results are not accessible to other tasks/retrys in the flow.
-The task author can override this and specify default value for provides using
-``default_provides`` class variable:
+The author can override this and specify default value for provides using
+the ``default_provides`` class/instance variable:
::
@@ -314,8 +321,8 @@ Of course, the flow author can override this to change names if needed:
BitsAndPiecesTask(provides=('b', 'p'))
-or to change structure -- e.g. this instance will make whole tuple accessible
-to other tasks by name 'bnp':
+or to change structure -- e.g. this instance will make tuple accessible
+to other tasks by name ``'bnp'``:
::
@@ -331,26 +338,27 @@ the task from other tasks in the flow (e.g. to avoid naming conflicts):
Revert arguments
================
-To revert a task engine calls its |task.revert| method. This method
-should accept same arguments as |task.execute| method of the task and one
-more special keyword argument, named ``result``.
+To revert a task the :doc:`engine <engines>` calls the tasks
+|task.revert| method. This method should accept the same arguments
+as the |task.execute| method of the task and one more special keyword
+argument, named ``result``.
For ``result`` value, two cases are possible:
-* if task is being reverted because it failed (an exception was raised from its
- |task.execute| method), ``result`` value is instance of
- :py:class:`taskflow.utils.misc.Failure` object that holds exception
- information;
+* If the task is being reverted because it failed (an exception was raised
+ from its |task.execute| method), the ``result`` value is an instance of a
+ :py:class:`~taskflow.utils.misc.Failure` object that holds the exception
+ information.
-* if task is being reverted because some other task failed, and this task
- finished successfully, ``result`` value is task result fetched from storage:
- basically, that's what |task.execute| method returned.
+* If the task is being reverted because some other task failed, and this task
+ finished successfully, ``result`` value is the result fetched from storage:
+ ie, what the |task.execute| method returned.
All other arguments are fetched from storage in the same way it is done for
|task.execute| method.
-To determine if task failed you can check whether ``result`` is instance of
-:py:class:`taskflow.utils.misc.Failure`::
+To determine if a task failed you can check whether ``result`` is instance of
+:py:class:`~taskflow.utils.misc.Failure`::
from taskflow.utils import misc
@@ -366,20 +374,21 @@ To determine if task failed you can check whether ``result`` is instance of
else:
print("do_something returned %r" % result)
-If this task failed (``do_something`` raised exception) it will print ``"This
-task failed, exception:"`` and exception message on revert. If this task
-finished successfully, it will print ``"do_something returned"`` and
-representation of result.
+If this task failed (ie ``do_something`` raised an exception) it will print
+``"This task failed, exception:"`` and a exception message on revert. If this
+task finished successfully, it will print ``"do_something returned"`` and a
+representation of the ``do_something`` result.
Retry arguments
===============
-A Retry controller works with arguments in the same way as a Task. But it has
-an additional parameter 'history' that is a list of tuples. Each tuple contains
-a result of the previous Retry run and a table where a key is a failed task and
-a value is a :py:class:`taskflow.utils.misc.Failure`.
+A |Retry| controller works with arguments in the same way as a |Task|. But
+it has an additional parameter ``'history'`` that is a list of tuples. Each
+tuple contains a result of the previous retry run and a table where the key
+is a failed task and the value is a
+:py:class:`~taskflow.utils.misc.Failure` object.
-Consider the following Retry::
+Consider the following implementation::
class MyRetry(retry.Retry):
@@ -396,19 +405,24 @@ Consider the following Retry::
def revert(self, history, *args, **kwargs):
print history
-Imagine the following Retry had returned a value '5' and then some task 'A'
-failed with some exception. In this case ``on_failure`` method will receive
-the following history::
+Imagine the above retry had returned a value ``'5'`` and then some task ``'A'``
+failed with some exception. In this case the above retrys ``on_failure``
+method will receive the following history::
[('5', {'A': misc.Failure()})]
-Then the |retry.execute| method will be called again and it'll receive the same
-history.
+At this point (since the implementation returned ``RETRY``) the
+|retry.execute| method will be called again and it will receive the same
+history and it can then return a value that subseqent tasks can use to alter
+there behavior.
-If the |retry.execute| method raises an exception, the |retry.revert| method of
-Retry will be called and :py:class:`taskflow.utils.misc.Failure` object will be
-present in the history instead of Retry result::
+If instead the |retry.execute| method raises an exception, the |retry.revert|
+method of the implementation will be called and
+a :py:class:`~taskflow.utils.misc.Failure` object will be present in the
+history instead of the typical result::
[('5', {'A': misc.Failure()}), (misc.Failure(), {})]
-After the Retry has been reverted, the Retry history will be cleaned.
+.. note::
+
+ After a |Retry| has been reverted, the objects history will be cleaned.
diff --git a/doc/source/atoms.rst b/doc/source/atoms.rst
index 8508634..26ca6ad 100644
--- a/doc/source/atoms.rst
+++ b/doc/source/atoms.rst
@@ -1,5 +1,5 @@
------------------------
-Atoms, Tasks and Retries
+Atoms, tasks and retries
------------------------
Atom
diff --git a/doc/source/engines.rst b/doc/source/engines.rst
index 752f9f0..45113a3 100644
--- a/doc/source/engines.rst
+++ b/doc/source/engines.rst
@@ -13,23 +13,23 @@ and uses it to decide which :doc:`atom <atoms>` to run and when.
TaskFlow provides different implementations of engines. Some may be easier to
use (ie, require no additional infrastructure setup) and understand; others
might require more complicated setup but provide better scalability. The idea
-and *ideal* is that deployers or developers of a service that uses TaskFlow can
+and *ideal* is that deployers or developers of a service that use TaskFlow can
select an engine that suites their setup best without modifying the code of
said service.
Engines usually have different capabilities and configuration, but all of them
**must** implement the same interface and preserve the semantics of patterns
-(e.g. parts of :py:class:`linear flow <taskflow.patterns.linear_flow.Flow>`
-are run one after another, in order, even if engine is *capable* of running
-tasks in parallel).
+(e.g. parts of a :py:class:`.linear_flow.Flow`
+are run one after another, in order, even if the selected engine is *capable*
+of running tasks in parallel).
Why they exist
--------------
-An engine being the core component which actually makes your flows progress is
-likely a new concept for many programmers so let's describe how it operates in
-more depth and some of the reasoning behind why it exists. This will hopefully
-make it more clear on there value add to the TaskFlow library user.
+An engine being *the* core component which actually makes your flows progress
+is likely a new concept for many programmers so let's describe how it operates
+in more depth and some of the reasoning behind why it exists. This will
+hopefully make it more clear on there value add to the TaskFlow library user.
First though let us discuss something most are familiar already with; the
difference between `declarative`_ and `imperative`_ programming models. The
@@ -48,15 +48,15 @@ more of a *pure* function that executes, reverts and may require inputs and
provide outputs). This is where engines get involved; they do the execution of
the *what* defined via :doc:`atoms <atoms>`, tasks, flows and the relationships
defined there-in and execute these in a well-defined manner (and the engine is
-responsible for *most* of the state manipulation instead).
+responsible for any state manipulation instead).
This mix of imperative and declarative (with a stronger emphasis on the
-declarative model) allows for the following functionality to be possible:
+declarative model) allows for the following functionality to become possible:
* Enhancing reliability: Decoupling of state alterations from what should be
accomplished allows for a *natural* way of resuming by allowing the engine to
- track the current state and know at which point a flow is in and how to get
- back into that state when resumption occurs.
+ track the current state and know at which point a workflow is in and how to
+ get back into that state when resumption occurs.
* Enhancing scalability: When a engine is responsible for executing your
desired work it becomes possible to alter the *how* in the future by creating
new types of execution backends (for example the worker model which does not
@@ -83,13 +83,14 @@ Of course these kind of features can come with some drawbacks:
away from (and this is likely a mindset change for programmers used to the
imperative model). We have worked to make this less of a concern by creating
and encouraging the usage of :doc:`persistence <persistence>`, to help make
- it possible to have some level of provided state transfer mechanism.
+ it possible to have state and tranfer that state via a argument input and
+ output mechanism.
* Depending on how much imperative code exists (and state inside that code)
- there can be *significant* rework of that code and converting or refactoring
- it to these new concepts. We have tried to help here by allowing you to have
- tasks that internally use regular python code (and internally can be written
- in an imperative style) as well as by providing examples and these developer
- docs; helping this process be as seamless as possible.
+ there *may* be *significant* rework of that code and converting or
+ refactoring it to these new concepts. We have tried to help here by allowing
+ you to have tasks that internally use regular python code (and internally can
+ be written in an imperative style) as well as by providing
+ :doc:`examples <examples>` that show how to use these concepts.
* Another one of the downsides of decoupling the *what* from the *how* is that
it may become harder to use traditional techniques to debug failures
(especially if remote workers are involved). We try to help here by making it
@@ -110,7 +111,7 @@ All engines are mere classes that implement the same interface, and of course
it is possible to import them and create instances just like with any classes
in Python. But the easier (and recommended) way for creating an engine is using
the engine helper functions. All of these functions are imported into the
-`taskflow.engines` module namespace, so the typical usage of these functions
+``taskflow.engines`` module namespace, so the typical usage of these functions
might look like::
from taskflow import engines
@@ -130,8 +131,8 @@ Usage
To select which engine to use and pass parameters to an engine you should use
the ``engine_conf`` parameter any helper factory function accepts. It may be:
-* a string, naming engine type;
-* a dictionary, holding engine type with key ``'engine'`` and possibly
+* A string, naming the engine type.
+* A dictionary, naming engine type with key ``'engine'`` and possibly
type-specific engine configuration parameters.
Single-Threaded
@@ -139,15 +140,20 @@ Single-Threaded
**Engine type**: ``'serial'``
-Runs all tasks on the single thread -- the same thread `engine.run()` is called
-on. This engine is used by default.
+Runs all tasks on a single thread -- the same thread ``engine.run()`` is
+called from.
+
+.. note::
+
+ This engine is used by default.
.. tip::
If eventlet is used then this engine will not block other threads
- from running as eventlet automatically creates a co-routine system (using
- greenthreads and monkey patching). See `eventlet <http://eventlet.net/>`_
- and `greenlet <http://greenlet.readthedocs.org/>`_ for more details.
+ from running as eventlet automatically creates a implicit co-routine
+ system (using greenthreads and monkey patching). See
+ `eventlet <http://eventlet.net/>`_ and
+ `greenlet <http://greenlet.readthedocs.org/>`_ for more details.
Parallel
--------
diff --git a/doc/source/inputs_and_outputs.rst b/doc/source/inputs_and_outputs.rst
index 34fb1ba..e820fef 100644
--- a/doc/source/inputs_and_outputs.rst
+++ b/doc/source/inputs_and_outputs.rst
@@ -1,11 +1,11 @@
==================
-Inputs and Outputs
+Inputs and outputs
==================
In TaskFlow there are multiple ways to provide inputs for your tasks and flows
and get information from them. This document describes one of them, that
involves task arguments and results. There are also :doc:`notifications
-<notifications>`, which allow you to get notified when task or flow changed
+<notifications>`, which allow you to get notified when a task or flow changes
state. You may also opt to use the :doc:`persistence <persistence>` layer
itself directly.
@@ -19,15 +19,16 @@ This is the standard and recommended way to pass data from one task to another.
Of course not every task argument needs to be provided to some other task of a
flow, and not every task result should be consumed by every task.
-If some value is required by one or more tasks of a flow, but is not provided
-by any task, it is considered to be flow input, and **must** be put into the
-storage before the flow is run. A set of names required by a flow can be
-retrieved via that flow's ``requires`` property. These names can be used to
+If some value is required by one or more tasks of a flow, but it is not
+provided by any task, it is considered to be flow input, and **must** be put
+into the storage before the flow is run. A set of names required by a flow can
+be retrieved via that flow's ``requires`` property. These names can be used to
determine what names may be applicable for placing in storage ahead of time
and which names are not applicable.
All values provided by tasks of the flow are considered to be flow outputs; the
-set of names of such values is available via ``provides`` property of the flow.
+set of names of such values is available via the ``provides`` property of the
+flow.
.. testsetup::
@@ -59,8 +60,10 @@ As you can see, this flow does not require b, as it is provided by the fist
task.
.. note::
- There is no difference between processing of Task and Retry inputs
- and outputs.
+
+ There is no difference between processing of
+ :py:class:`Task <taskflow.task.BaseTask>` and
+ :py:class:`~taskflow.retry.Retry` inputs and outputs.
------------------
Engine and storage
@@ -146,8 +149,10 @@ Outputs
As you can see from examples above, the run method returns all flow outputs in
a ``dict``. This same data can be fetched via
-:py:meth:`~taskflow.storage.Storage.fetch_all` method of the storage. You can
-also get single results using :py:meth:`~taskflow.storage.Storage.fetch`.
+:py:meth:`~taskflow.storage.Storage.fetch_all` method of the engines storage
+object. You can also get single results using the
+engines storage objects :py:meth:`~taskflow.storage.Storage.fetch` method.
+
For example:
.. doctest::
diff --git a/doc/source/notifications.rst b/doc/source/notifications.rst
index 3fe430d..755f7b1 100644
--- a/doc/source/notifications.rst
+++ b/doc/source/notifications.rst
@@ -1,5 +1,5 @@
===========================
-Notifications and Listeners
+Notifications and listeners
===========================
.. testsetup::
@@ -17,9 +17,9 @@ transitions, which is useful for monitoring, logging, metrics, debugging
and plenty of other tasks.
To receive these notifications you should register a callback with
-an instance of the the :py:class:`notifier <taskflow.utils.misc.Notifier>`
+an instance of the :py:class:`~taskflow.utils.misc.Notifier`
class that is attached
-to :py:class:`engine <taskflow.engines.base.EngineBase>`
+to :py:class:`Engine <taskflow.engines.base.EngineBase>`
attributes ``task_notifier`` and ``notifier``.
TaskFlow also comes with a set of predefined :ref:`listeners <listeners>`, and
@@ -30,17 +30,14 @@ using raw callbacks.
Receiving notifications with callbacks
--------------------------------------
-To manage notifications instances of
-:py:class:`~taskflow.utils.misc.Notifier` are used.
-
-.. autoclass:: taskflow.utils.misc.Notifier
-
Flow notifications
------------------
-To receive notification on flow state changes use
-:py:class:`~taskflow.utils.misc.Notifier` available as
-``notifier`` property of the engine. A basic example is:
+To receive notification on flow state changes use the
+:py:class:`~taskflow.utils.misc.Notifier` instance available as the
+``notifier`` property of an engine.
+
+A basic example is:
.. doctest::
@@ -71,9 +68,11 @@ To receive notification on flow state changes use
Task notifications
------------------
-To receive notification on task state changes use
-:py:class:`~taskflow.utils.misc.Notifier` available as
-``task_notifier`` property of the engine. A basic example is:
+To receive notification on task state changes use the
+:py:class:`~taskflow.utils.misc.Notifier` instance available as the
+``task_notifier`` property of an engine.
+
+A basic example is:
.. doctest::
diff --git a/doc/source/types.rst b/doc/source/types.rst
index a5afa67..1f573c8 100644
--- a/doc/source/types.rst
+++ b/doc/source/types.rst
@@ -7,6 +7,11 @@ Cache
.. automodule:: taskflow.types.cache
+Failure
+=======
+
+.. autoclass:: taskflow.utils.misc.Failure
+
FSM
===
@@ -17,6 +22,11 @@ Graph
.. automodule:: taskflow.types.graph
+Notifier
+========
+
+.. autoclass:: taskflow.utils.misc.Notifier
+
Table
=====
diff --git a/taskflow/engines/worker_based/worker.py b/taskflow/engines/worker_based/worker.py
index 49816ea..ee3ea15 100644
--- a/taskflow/engines/worker_based/worker.py
+++ b/taskflow/engines/worker_based/worker.py
@@ -69,34 +69,16 @@ class Worker(object):
:param url: broker url
:param exchange: broker exchange name
:param topic: topic name under which worker is stated
- :param tasks: tasks list that worker is capable to perform
-
- Tasks list item can be one of the following types:
- 1. String:
-
- 1.1 Python module name:
-
- > tasks=['taskflow.tests.utils']
-
- 1.2. Task class (BaseTask subclass) name:
-
- > tasks=['taskflow.test.utils.DummyTask']
-
- 3. Python module:
-
- > from taskflow.tests import utils
- > tasks=[utils]
-
- 4. Task class (BaseTask subclass):
-
- > from taskflow.tests import utils
- > tasks=[utils.DummyTask]
-
- :param executor: custom executor object that is used for processing
- requests in separate threads
- :keyword threads_count: threads count to be passed to the default executor
- :keyword transport: transport to be used (e.g. amqp, memory, etc.)
- :keyword transport_options: transport specific options
+ :param tasks: task list that worker is capable of performing, items in
+ the list can be one of the following types; 1, a string naming the
+ python module name to search for tasks in or the task class name; 2, a
+ python module to search for tasks in; 3, a task class object that
+ will be used to create tasks from.
+ :param executor: custom executor object that can used for processing
+ requests in separate threads (if not provided one will be created)
+ :param threads_count: threads count to be passed to the default executor
+ :param transport: transport to be used (e.g. amqp, memory, etc.)
+ :param transport_options: transport specific options
"""
def __init__(self, exchange, topic, tasks, executor=None, **kwargs):
diff --git a/taskflow/utils/misc.py b/taskflow/utils/misc.py
index 035e86a..3984621 100644
--- a/taskflow/utils/misc.py
+++ b/taskflow/utils/misc.py
@@ -416,7 +416,10 @@ class Notifier(object):
notification occurs.
"""
+ #: Keys that can not be used in callbacks arguments
RESERVED_KEYS = ('details',)
+
+ #: Kleene star constant that is used to recieve all notifications
ANY = '*'
def __init__(self):
@@ -474,9 +477,9 @@ class Notifier(object):
Callback will be called with provided ``args`` and ``kwargs`` and
when event type occurs (or on any event if ``event_type`` equals to
- ``Notifier.ANY``). It will also get additional keyword argument,
- ``details``, that will hold event details provided to
- :py:meth:`notify` method.
+ :attr:`.ANY`). It will also get additional keyword argument,
+ ``details``, that will hold event details provided to the
+ :meth:`.notify` method.
"""
assert six.callable(callback), "Callback must be callable"
if self.is_registered(event_type, callback):
@@ -576,9 +579,10 @@ class Failure(object):
remote worker throws an exception, the WBE based engine will receive that
exception and desire to reraise it to the user/caller of the WBE based
engine for appropriate handling (this matches the behavior of non-remote
- engines). To accomplish this a failure object (or a to_dict() form) would
- be sent over the WBE channel and the WBE based engine would deserialize it
- and use this objects reraise() method to cause an exception that contains
+ engines). To accomplish this a failure object (or a
+ :py:meth:`~misc.Failure.to_dict` form) would be sent over the WBE channel
+ and the WBE based engine would deserialize it and use this objects
+ :meth:`.reraise` method to cause an exception that contains
similar/equivalent information as the original exception to be reraised,
allowing the user (or the WBE engine itself) to then handle the worker
failure/exception as they desire.
@@ -642,6 +646,7 @@ class Failure(object):
@classmethod
def from_exception(cls, exception):
+ """Creates a failure object from a exception instance."""
return cls((type(exception), exception, None))
def _matches(self, other):
@@ -652,6 +657,7 @@ class Failure(object):
and self.traceback_str == other.traceback_str)
def matches(self, other):
+ """Checks if another object is equivalent to this object."""
if not isinstance(other, Failure):
return False
if self.exc_info is None or other.exc_info is None:
@@ -706,9 +712,10 @@ class Failure(object):
"""Re-raise exceptions if argument is not empty.
If argument is empty list, this method returns None. If
- argument is list with single Failure object in it,
- this failure is reraised. Else, WrappedFailure exception
- is raised with failures list as causes.
+ argument is a list with a single ``Failure`` object in it,
+ that failure is reraised. Else, a
+ :class:`~taskflow.exceptions.WrappedFailure` exception
+ is raised with a failure list as causes.
"""
failures = list(failures)
if len(failures) == 1:
@@ -724,7 +731,7 @@ class Failure(object):
raise exc.WrappedFailure([self])
def check(self, *exc_classes):
- """Check if any of exc_classes caused the failure.
+ """Check if any of ``exc_classes`` caused the failure.
Arguments of this method can be exception types or type
names (stings). If captured exception is instance of
@@ -744,6 +751,7 @@ class Failure(object):
return self.pformat()
def pformat(self, traceback=False):
+ """Pretty formats the failure object into a string."""
buf = six.StringIO()
buf.write(
'Failure: %s: %s' % (self._exc_type_names[0], self._exception_str))
@@ -766,6 +774,7 @@ class Failure(object):
@classmethod
def from_dict(cls, data):
+ """Converts this from a dictionary to a object."""
data = dict(data)
version = data.pop('version', None)
if version != cls.DICT_VERSION:
@@ -774,6 +783,7 @@ class Failure(object):
return cls(**data)
def to_dict(self):
+ """Converts this object to a dictionary."""
return {
'exception_str': self.exception_str,
'traceback_str': self.traceback_str,
@@ -782,6 +792,7 @@ class Failure(object):
}
def copy(self):
+ """Copies this object."""
return Failure(exc_info=copy_exc_info(self.exc_info),
exception_str=self.exception_str,
traceback_str=self.traceback_str,