summaryrefslogtreecommitdiff
path: root/docs
diff options
context:
space:
mode:
authorDaniel G. Taylor <dan@programmer-art.org>2014-07-01 13:22:27 -0700
committerDaniel G. Taylor <dan@programmer-art.org>2014-07-01 13:22:27 -0700
commit4835676a72339de5944d17621fea8f228f9d00b2 (patch)
tree7d1577faf97e4fac8c445b5a48419a8911f4f5a1 /docs
parent45fda90061fc42ae72d3561e6ceb88a22ef4bb53 (diff)
parent1d2c6ecbf1522e29ec9a4de6df405fd1ef19c4d8 (diff)
downloadboto-4835676a72339de5944d17621fea8f228f9d00b2.tar.gz
Merge pull request #2333 from b0morris/develop
Documentation update -- Child workflows and poll API. Fixes #2063, #2064, #2333.
Diffstat (limited to 'docs')
-rw-r--r--docs/source/swf_tut.rst148
1 files changed, 139 insertions, 9 deletions
diff --git a/docs/source/swf_tut.rst b/docs/source/swf_tut.rst
index 68588265..ffbacfd2 100644
--- a/docs/source/swf_tut.rst
+++ b/docs/source/swf_tut.rst
@@ -1,5 +1,5 @@
.. swf_tut:
- :Authors: Slawek "oozie" Ligus <root@ooz.ie>
+ :Authors: Slawek "oozie" Ligus <root@ooz.ie>, Brad Morris <bradley.s.morris@gmail.com>
===============================
Amazon Simple Workflow Tutorial
@@ -60,7 +60,7 @@ Before workflows and activities can be used, they have to be registered with SWF
registerables = []
registerables.append(swf.Domain(name=DOMAIN))
- for workflow_type in ('HelloWorkflow', 'SerialWorkflow', 'ParallelWorkflow'):
+ for workflow_type in ('HelloWorkflow', 'SerialWorkflow', 'ParallelWorkflow', 'SubWorkflow'):
registerables.append(swf.WorkflowType(domain=DOMAIN, name=workflow_type, version=VERSION, task_list='default'))
for activity_type in ('HelloWorld', 'ActivityA', 'ActivityB', 'ActivityC'):
@@ -441,11 +441,11 @@ The decider schedules all activities at once and marks progress until all activi
import boto.swf.layer2 as swf
import time
-
+
SCHED_COUNT = 5
-
+
class ParallelDecider(swf.Decider):
-
+
domain = 'boto_tutorial'
task_list = 'default'
def run(self):
@@ -480,12 +480,12 @@ Again, the only bit of information a worker needs is which task list to poll.
# parallel_worker.py
import time
import boto.swf.layer2 as swf
-
+
class ParallelWorker(swf.ActivityWorker):
-
+
domain = 'boto_tutorial'
task_list = 'default'
-
+
def run(self):
"""Report current time."""
activity_task = self.poll()
@@ -517,7 +517,7 @@ Run two or more workers to see how the service partitions work execution in para
working on activity1
working on activity3
working on activity4
-
+
.. code-block:: bash
$ python -i parallel_worker.py
@@ -528,6 +528,136 @@ Run two or more workers to see how the service partitions work execution in para
As seen above, the work was partitioned between the two running workers.
+Sub-Workflows
+-------------
+
+Sometimes it's desired or necessary to break the process up into multiple workflows.
+
+Since the decider is stateless, it's up to you to determine which workflow is being used and which action
+you would like to take.
+
+.. code-block:: python
+
+ import boto.swf.layer2 as swf
+
+ class SubWorkflowDecider(swf.Decider):
+
+ domain = 'boto_tutorial'
+ task_list = 'default'
+ version = '1.0'
+
+ def run(self):
+ history = self.poll()
+ events = []
+ if 'events' in history:
+ events = history['events']
+ # Collect the entire history if there are enough events to become paginated
+ while 'nextPageToken' in history:
+ history = self.poll(next_page_token=history['nextPageToken'])
+ if 'events' in history:
+ events = events + history['events']
+
+ workflow_type = history['workflowType']['name']
+
+ # Get all of the relevent events that have happened since the last decision task was started
+ workflow_events = [e for e in events
+ if e['eventId'] > history['previousStartedEventId'] and
+ not e['eventType'].startswith('Decision')]
+
+ decisions = swf.Layer1Decisions()
+
+ for event in workflow_events:
+ last_event_type = event['eventType']
+ if last_event_type == 'WorkflowExecutionStarted':
+ if workflow_type == 'SerialWorkflow':
+ decisions.start_child_workflow_execution('SubWorkflow', self.version,
+ "subworkflow_1", task_list=self.task_list, input="sub_1")
+ elif workflow_type == 'SubWorkflow':
+ for i in range(2):
+ decisions.schedule_activity_task("activity_%d" % i, 'ActivityA', self.version, task_list='a_tasks')
+ else:
+ decisions.fail_workflow_execution(reason="Unknown workflow %s" % workflow_type)
+ break
+
+ elif last_event_type == 'ChildWorkflowExecutionCompleted':
+ decisions.schedule_activity_task("activity_2", 'ActivityB', self.version, task_list='b_tasks')
+
+ elif last_event_type == 'ActivityTaskCompleted':
+ attrs = event['activityTaskCompletedEventAttributes']
+ activity = events[attrs['scheduledEventId'] - 1]
+ activity_name = activity['activityTaskScheduledEventAttributes']['activityType']['name']
+
+ if activity_name == 'ActivityA':
+ completed_count = sum([1 for a in events if a['eventType'] == 'ActivityTaskCompleted'])
+ if completed_count == 2:
+ # Complete the child workflow
+ decisions.complete_workflow_execution()
+ elif activity_name == 'ActivityB':
+ # Complete the parent workflow
+ decisions.complete_workflow_execution()
+
+ self.complete(decisions=decisions)
+ return True
+
+Misc
+----
+
+Some of these things are not obvious by reading the API documents, so hopefully they help you
+avoid some time-consuming pitfalls.
+
+Pagination
+==========
+
+When the decider polls for new tasks, the maximum number of events it will return at a time is 100
+(configurable to a smaller number, but not larger). When running a workflow, this number gets quickly
+exceeded. If it does, the decision task will contain a key ``nextPageToken`` which can be submit to the
+``poll()`` call to get the next page of events.
+
+.. code-block:: python
+
+ decision_task = self.poll()
+
+ events = []
+ if 'events' in decision_task:
+ events = decision_task['events']
+ while 'nextPageToken' in decision_task:
+ decision_task = self.poll(next_page_token=decision_task['nextPageToken'])
+ if 'events' in decision_task:
+ events += decision_task['events']
+
+Depending on your workflow logic, you might not need to aggregate all of the events.
+
+Decision Tasks
+==============
+
+When first running deciders and activities, it may seem that the decider gets called for every event that
+an activity triggers; however, this is not the case. More than one event can happen between decision tasks.
+The decision task will contain a key ``previousStartedEventId`` that lets you know the ``eventId`` of the
+last DecisionTaskStarted event that was processed. Your script will need to handle all of the events
+that have happened since then, not just the last activity.
+
+.. code-block:: python
+
+ workflow_events = [e for e in events if e['eventId'] > decision_task['previousStartedEventId']]
+
+You may also wish to still filter out tasks that start with 'Decision' or filter it in some other way
+that fulfills your needs. You will now have to iterate over the workflow_events list and respond to
+each event, as it may contain multiple events.
+
+Filtering Events
+================
+
+When running many tasks in parallel, a common task is searching through the history to see how many events
+of a particular activity type started, completed, and/or failed. Some basic list comprehension makes
+this trivial.
+
+.. code-block:: python
+
+ def filter_completed_events(self, events, type):
+ completed = [e for e in events if e['eventType'] == 'ActivityTaskCompleted']
+ orig = [events[e['activityTaskCompletedEventAttributes']['scheduledEventId']-1] for e in completed]
+ return [e for e in orig if e['activityTaskScheduledEventAttributes']['activityType']['name'] == type]
+
.. _Amazon SWF API Reference: http://docs.aws.amazon.com/amazonswf/latest/apireference/Welcome.html
.. _StackOverflow questions: http://stackoverflow.com/questions/tagged/amazon-swf
.. _Miscellaneous Blog Articles: http://log.ooz.ie/search/label/SimpleWorkflow