diff options
author | Daniel G. Taylor <dan@programmer-art.org> | 2014-07-01 13:22:27 -0700 |
---|---|---|
committer | Daniel G. Taylor <dan@programmer-art.org> | 2014-07-01 13:22:27 -0700 |
commit | 4835676a72339de5944d17621fea8f228f9d00b2 (patch) | |
tree | 7d1577faf97e4fac8c445b5a48419a8911f4f5a1 /docs | |
parent | 45fda90061fc42ae72d3561e6ceb88a22ef4bb53 (diff) | |
parent | 1d2c6ecbf1522e29ec9a4de6df405fd1ef19c4d8 (diff) | |
download | boto-4835676a72339de5944d17621fea8f228f9d00b2.tar.gz |
Merge pull request #2333 from b0morris/develop
Documentation update -- Child workflows and poll API. Fixes #2063, #2064, #2333.
Diffstat (limited to 'docs')
-rw-r--r-- | docs/source/swf_tut.rst | 148 |
1 files changed, 139 insertions, 9 deletions
diff --git a/docs/source/swf_tut.rst b/docs/source/swf_tut.rst index 68588265..ffbacfd2 100644 --- a/docs/source/swf_tut.rst +++ b/docs/source/swf_tut.rst @@ -1,5 +1,5 @@ .. swf_tut: - :Authors: Slawek "oozie" Ligus <root@ooz.ie> + :Authors: Slawek "oozie" Ligus <root@ooz.ie>, Brad Morris <bradley.s.morris@gmail.com> =============================== Amazon Simple Workflow Tutorial @@ -60,7 +60,7 @@ Before workflows and activities can be used, they have to be registered with SWF registerables = [] registerables.append(swf.Domain(name=DOMAIN)) - for workflow_type in ('HelloWorkflow', 'SerialWorkflow', 'ParallelWorkflow'): + for workflow_type in ('HelloWorkflow', 'SerialWorkflow', 'ParallelWorkflow', 'SubWorkflow'): registerables.append(swf.WorkflowType(domain=DOMAIN, name=workflow_type, version=VERSION, task_list='default')) for activity_type in ('HelloWorld', 'ActivityA', 'ActivityB', 'ActivityC'): @@ -441,11 +441,11 @@ The decider schedules all activities at once and marks progress until all activi import boto.swf.layer2 as swf import time - + SCHED_COUNT = 5 - + class ParallelDecider(swf.Decider): - + domain = 'boto_tutorial' task_list = 'default' def run(self): @@ -480,12 +480,12 @@ Again, the only bit of information a worker needs is which task list to poll. # parallel_worker.py import time import boto.swf.layer2 as swf - + class ParallelWorker(swf.ActivityWorker): - + domain = 'boto_tutorial' task_list = 'default' - + def run(self): """Report current time.""" activity_task = self.poll() @@ -517,7 +517,7 @@ Run two or more workers to see how the service partitions work execution in para working on activity1 working on activity3 working on activity4 - + .. code-block:: bash $ python -i parallel_worker.py @@ -528,6 +528,136 @@ Run two or more workers to see how the service partitions work execution in para As seen above, the work was partitioned between the two running workers. +Sub-Workflows +------------- + +Sometimes it's desired or necessary to break the process up into multiple workflows. + +Since the decider is stateless, it's up to you to determine which workflow is being used and which action +you would like to take. + +.. code-block:: python + + import boto.swf.layer2 as swf + + class SubWorkflowDecider(swf.Decider): + + domain = 'boto_tutorial' + task_list = 'default' + version = '1.0' + + def run(self): + history = self.poll() + events = [] + if 'events' in history: + events = history['events'] + # Collect the entire history if there are enough events to become paginated + while 'nextPageToken' in history: + history = self.poll(next_page_token=history['nextPageToken']) + if 'events' in history: + events = events + history['events'] + + workflow_type = history['workflowType']['name'] + + # Get all of the relevent events that have happened since the last decision task was started + workflow_events = [e for e in events + if e['eventId'] > history['previousStartedEventId'] and + not e['eventType'].startswith('Decision')] + + decisions = swf.Layer1Decisions() + + for event in workflow_events: + last_event_type = event['eventType'] + if last_event_type == 'WorkflowExecutionStarted': + if workflow_type == 'SerialWorkflow': + decisions.start_child_workflow_execution('SubWorkflow', self.version, + "subworkflow_1", task_list=self.task_list, input="sub_1") + elif workflow_type == 'SubWorkflow': + for i in range(2): + decisions.schedule_activity_task("activity_%d" % i, 'ActivityA', self.version, task_list='a_tasks') + else: + decisions.fail_workflow_execution(reason="Unknown workflow %s" % workflow_type) + break + + elif last_event_type == 'ChildWorkflowExecutionCompleted': + decisions.schedule_activity_task("activity_2", 'ActivityB', self.version, task_list='b_tasks') + + elif last_event_type == 'ActivityTaskCompleted': + attrs = event['activityTaskCompletedEventAttributes'] + activity = events[attrs['scheduledEventId'] - 1] + activity_name = activity['activityTaskScheduledEventAttributes']['activityType']['name'] + + if activity_name == 'ActivityA': + completed_count = sum([1 for a in events if a['eventType'] == 'ActivityTaskCompleted']) + if completed_count == 2: + # Complete the child workflow + decisions.complete_workflow_execution() + elif activity_name == 'ActivityB': + # Complete the parent workflow + decisions.complete_workflow_execution() + + self.complete(decisions=decisions) + return True + +Misc +---- + +Some of these things are not obvious by reading the API documents, so hopefully they help you +avoid some time-consuming pitfalls. + +Pagination +========== + +When the decider polls for new tasks, the maximum number of events it will return at a time is 100 +(configurable to a smaller number, but not larger). When running a workflow, this number gets quickly +exceeded. If it does, the decision task will contain a key ``nextPageToken`` which can be submit to the +``poll()`` call to get the next page of events. + +.. code-block:: python + + decision_task = self.poll() + + events = [] + if 'events' in decision_task: + events = decision_task['events'] + while 'nextPageToken' in decision_task: + decision_task = self.poll(next_page_token=decision_task['nextPageToken']) + if 'events' in decision_task: + events += decision_task['events'] + +Depending on your workflow logic, you might not need to aggregate all of the events. + +Decision Tasks +============== + +When first running deciders and activities, it may seem that the decider gets called for every event that +an activity triggers; however, this is not the case. More than one event can happen between decision tasks. +The decision task will contain a key ``previousStartedEventId`` that lets you know the ``eventId`` of the +last DecisionTaskStarted event that was processed. Your script will need to handle all of the events +that have happened since then, not just the last activity. + +.. code-block:: python + + workflow_events = [e for e in events if e['eventId'] > decision_task['previousStartedEventId']] + +You may also wish to still filter out tasks that start with 'Decision' or filter it in some other way +that fulfills your needs. You will now have to iterate over the workflow_events list and respond to +each event, as it may contain multiple events. + +Filtering Events +================ + +When running many tasks in parallel, a common task is searching through the history to see how many events +of a particular activity type started, completed, and/or failed. Some basic list comprehension makes +this trivial. + +.. code-block:: python + + def filter_completed_events(self, events, type): + completed = [e for e in events if e['eventType'] == 'ActivityTaskCompleted'] + orig = [events[e['activityTaskCompletedEventAttributes']['scheduledEventId']-1] for e in completed] + return [e for e in orig if e['activityTaskScheduledEventAttributes']['activityType']['name'] == type] + .. _Amazon SWF API Reference: http://docs.aws.amazon.com/amazonswf/latest/apireference/Welcome.html .. _StackOverflow questions: http://stackoverflow.com/questions/tagged/amazon-swf .. _Miscellaneous Blog Articles: http://log.ooz.ie/search/label/SimpleWorkflow |