summaryrefslogtreecommitdiff
path: root/docs/source/swf_tut.rst
blob: ffbacfd2c636cf3e02b9064c1068ef30141bf727 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
.. swf_tut:
 :Authors: Slawek "oozie" Ligus <root@ooz.ie>, Brad Morris <bradley.s.morris@gmail.com>

===============================
Amazon Simple Workflow Tutorial
===============================

This tutorial focuses on boto's interface to AWS SimpleWorkflow service.

.. _SimpleWorkflow: http://aws.amazon.com/swf/

What is a workflow?
-------------------

A workflow is a sequence of multiple activities aimed at accomplishing a well-defined objective. For instance, booking an airline ticket as a workflow may encompass multiple activities, such as selection of itinerary, submission of personal details, payment validation and booking confirmation. 

Except for the start and completion of a workflow, each step has a well-defined predecessor and successor. With that
  - on successful completion of an activity the workflow can progress with its execution,
  - when one of workflow's activities fails it can be retried,
  - and when it keeps failing repeatedly the workflow may regress to the previous step to gather alternative inputs or it may simply fail at that stage.

Why use workflows?
------------------

Modelling an application on a workflow provides a useful abstraction layer for writing highly-reliable programs for distributed systems, as individual responsibilities can be delegated to a set of redundant, independent and non-critical processing units.

How does Amazon SWF help you accomplish this?
---------------------------------------------

Amazon SimpleWorkflow service defines an interface for workflow orchestration and provides state persistence for workflow executions.

Amazon SWF applications involve communication between the following entities:
  - The Amazon Simple Workflow Service - providing centralized orchestration and workflow state persistence,
  - Workflow Executors - some entity starting workflow executions, typically through an action taken by a user or from a cronjob.
  - Deciders - a program codifying the business logic, i.e. a set of instructions and decisions. Deciders take decisions based on initial set of conditions and outcomes from activities.
  - Activity Workers - their objective is very straightforward: to take inputs, execute the tasks and return a result to the Service.

The Workflow Executor contacts SWF Service and requests instantiation of a workflow. A new workflow is created and its state is stored in the service. 
The next time a decider contacts SWF service to ask for a decision task, it will be informed about a new workflow execution is taking place and it will be asked to advise SWF service on what the next steps should be. The decider then instructs the service to dispatch specific tasks to activity workers. At the next activity worker poll, the task is dispatched, then executed and the results reported back to the SWF, which then passes them onto the deciders. This exchange keeps happening repeatedly until the decider is satisfied and instructs the service to complete the execution.

Prerequisites
-------------

You need a valid access and secret key. The examples below assume that you have exported them to your environment, as follows:

.. code-block:: bash

    bash$ export AWS_ACCESS_KEY_ID=<your access key>
    bash$ export AWS_SECRET_ACCESS_KEY=<your secret key>

Before workflows and activities can be used, they have to be registered with SWF service:

.. code-block:: python

    # register.py
    import boto.swf.layer2 as swf
    from boto.swf.exceptions import SWFTypeAlreadyExistsError, SWFDomainAlreadyExistsError
    DOMAIN = 'boto_tutorial'
    VERSION = '1.0'
    
    registerables = []
    registerables.append(swf.Domain(name=DOMAIN))
    for workflow_type in ('HelloWorkflow', 'SerialWorkflow', 'ParallelWorkflow', 'SubWorkflow'):
        registerables.append(swf.WorkflowType(domain=DOMAIN, name=workflow_type, version=VERSION, task_list='default'))
    
    for activity_type in ('HelloWorld', 'ActivityA', 'ActivityB', 'ActivityC'):
        registerables.append(swf.ActivityType(domain=DOMAIN, name=activity_type, version=VERSION, task_list='default'))
    
    for swf_entity in registerables:
        try:
            swf_entity.register()
            print swf_entity.name, 'registered successfully'
        except (SWFDomainAlreadyExistsError, SWFTypeAlreadyExistsError):
            print swf_entity.__class__.__name__, swf_entity.name, 'already exists'
            
    
Execution of the above should produce no errors.

.. code-block:: bash

    bash$ python -i register.py
    Domain boto_tutorial already exists
    WorkflowType HelloWorkflow already exists
    SerialWorkflow registered successfully
    ParallelWorkflow registered successfully
    ActivityType HelloWorld already exists
    ActivityA registered successfully
    ActivityB registered successfully
    ActivityC registered successfully
    >>> 

HelloWorld
----------

This example is an implementation of a minimal Hello World workflow. Its execution should unfold as follows:

#. A workflow execution is started.
#. The SWF service schedules the initial decision task.
#. A decider polls for decision tasks and receives one.
#. The decider requests scheduling of an activity task.
#. The SWF service schedules the greeting activity task.
#. An activity worker polls for activity task and receives one.
#. The worker completes the greeting activity.
#. The SWF service schedules a decision task to inform about work outcome.
#. The decider polls and receives a new decision task.
#. The decider schedules workflow completion.
#. The workflow execution finishes.

Workflow logic is encoded in the decider:

.. code-block:: python

    # hello_decider.py
    import boto.swf.layer2 as swf
    
    DOMAIN = 'boto_tutorial'
    ACTIVITY = 'HelloWorld'
    VERSION = '1.0'
    TASKLIST = 'default'
    
    class HelloDecider(swf.Decider):
    
        domain = DOMAIN
        task_list = TASKLIST
        version = VERSION
    
        def run(self):
            history = self.poll()
            if 'events' in history:
                # Find workflow events not related to decision scheduling.
                workflow_events = [e for e in history['events']
                    if not e['eventType'].startswith('Decision')]
                last_event = workflow_events[-1]
    
                decisions = swf.Layer1Decisions()
                if last_event['eventType'] == 'WorkflowExecutionStarted':
                    decisions.schedule_activity_task('saying_hi', ACTIVITY, VERSION, task_list=TASKLIST)
                elif last_event['eventType'] == 'ActivityTaskCompleted':
                    decisions.complete_workflow_execution()
                self.complete(decisions=decisions)
                return True   
    
The activity worker is responsible for printing the greeting message when the activity task is dispatched to it by the service:

.. code-block:: python

    import boto.swf.layer2 as swf
    
    DOMAIN = 'boto_tutorial'
    VERSION = '1.0'
    TASKLIST = 'default'
    
    class HelloWorker(swf.ActivityWorker):
    
        domain = DOMAIN
        version = VERSION
        task_list = TASKLIST
    
        def run(self):
            activity_task = self.poll()
            if 'activityId' in activity_task:
                print 'Hello, World!'
                self.complete()
                return True

With actors implemented we can spin up a workflow execution:

.. code-block:: bash

    $ python
    >>> import boto.swf.layer2 as swf
    >>> execution = swf.WorkflowType(name='HelloWorkflow', domain='boto_tutorial', version='1.0', task_list='default').start()
    >>> 
    
From separate terminals run an instance of a worker and a decider to carry out a workflow execution (the worker and decider may run from two independent machines).

.. code-block:: bash

   $ python -i hello_decider.py
   >>> while HelloDecider().run(): pass
   ... 

.. code-block:: bash

   $ python -i hello_worker.py
   >>> while HelloWorker().run(): pass
   ... 
   Hello, World!

Great. Now, to see what just happened, go back to the original terminal from which the execution was started, and read its history.

.. code-block:: bash

    >>> execution.history()
    [{'eventId': 1,
      'eventTimestamp': 1381095173.2539999,
      'eventType': 'WorkflowExecutionStarted',
      'workflowExecutionStartedEventAttributes': {'childPolicy': 'TERMINATE',
                                                  'executionStartToCloseTimeout': '3600',
                                                  'parentInitiatedEventId': 0,
                                                  'taskList': {'name': 'default'},
                                                  'taskStartToCloseTimeout': '300',
                                                  'workflowType': {'name': 'HelloWorkflow',
                                                                   'version': '1.0'}}},
     {'decisionTaskScheduledEventAttributes': {'startToCloseTimeout': '300',
                                               'taskList': {'name': 'default'}},
      'eventId': 2,
      'eventTimestamp': 1381095173.2539999,
      'eventType': 'DecisionTaskScheduled'},
     {'decisionTaskStartedEventAttributes': {'scheduledEventId': 2},
      'eventId': 3,
      'eventTimestamp': 1381095177.5439999,
      'eventType': 'DecisionTaskStarted'},
     {'decisionTaskCompletedEventAttributes': {'scheduledEventId': 2,
                                               'startedEventId': 3},
      'eventId': 4,
      'eventTimestamp': 1381095177.855,
      'eventType': 'DecisionTaskCompleted'},
     {'activityTaskScheduledEventAttributes': {'activityId': 'saying_hi',
                                               'activityType': {'name': 'HelloWorld',
                                                                'version': '1.0'},
                                               'decisionTaskCompletedEventId': 4,
                                               'heartbeatTimeout': '600',
                                               'scheduleToCloseTimeout': '3900',
                                               'scheduleToStartTimeout': '300',
                                               'startToCloseTimeout': '3600',
                                               'taskList': {'name': 'default'}},
      'eventId': 5,
      'eventTimestamp': 1381095177.855,
      'eventType': 'ActivityTaskScheduled'},
     {'activityTaskStartedEventAttributes': {'scheduledEventId': 5},
      'eventId': 6,
      'eventTimestamp': 1381095179.427,
      'eventType': 'ActivityTaskStarted'},
     {'activityTaskCompletedEventAttributes': {'scheduledEventId': 5,
                                               'startedEventId': 6},
      'eventId': 7,
      'eventTimestamp': 1381095179.6989999,
      'eventType': 'ActivityTaskCompleted'},
     {'decisionTaskScheduledEventAttributes': {'startToCloseTimeout': '300',
                                               'taskList': {'name': 'default'}},
      'eventId': 8,
      'eventTimestamp': 1381095179.6989999,
      'eventType': 'DecisionTaskScheduled'},
     {'decisionTaskStartedEventAttributes': {'scheduledEventId': 8},
      'eventId': 9,
      'eventTimestamp': 1381095179.7420001,
      'eventType': 'DecisionTaskStarted'},
     {'decisionTaskCompletedEventAttributes': {'scheduledEventId': 8,
                                               'startedEventId': 9},
      'eventId': 10,
      'eventTimestamp': 1381095180.026,
      'eventType': 'DecisionTaskCompleted'},
     {'eventId': 11,
      'eventTimestamp': 1381095180.026,
      'eventType': 'WorkflowExecutionCompleted',
      'workflowExecutionCompletedEventAttributes': {'decisionTaskCompletedEventId': 10}}]
    

Serial Activity Execution
-------------------------

The following example implements a basic workflow with activities executed one after another.

The business logic, i.e. the serial execution of activities, is encoded in the decider:

.. code-block:: python

    # serial_decider.py
    import time
    import boto.swf.layer2 as swf
    
    class SerialDecider(swf.Decider):
    
        domain = 'boto_tutorial'
        task_list = 'default_tasks'
        version = '1.0'
    
        def run(self):
            history = self.poll()
            if 'events' in history:
                # Get a list of non-decision events to see what event came in last.
                workflow_events = [e for e in history['events']
                                   if not e['eventType'].startswith('Decision')]
                decisions = swf.Layer1Decisions()
                # Record latest non-decision event.
                last_event = workflow_events[-1]
                last_event_type = last_event['eventType']
                if last_event_type == 'WorkflowExecutionStarted':
                    # Schedule the first activity.
                    decisions.schedule_activity_task('%s-%i' % ('ActivityA', time.time()),
                       'ActivityA', self.version, task_list='a_tasks')
                elif last_event_type == 'ActivityTaskCompleted':
                    # Take decision based on the name of activity that has just completed.
                    # 1) Get activity's event id.
                    last_event_attrs = last_event['activityTaskCompletedEventAttributes']
                    completed_activity_id = last_event_attrs['scheduledEventId'] - 1
                    # 2) Extract its name.
                    activity_data = history['events'][completed_activity_id]
                    activity_attrs = activity_data['activityTaskScheduledEventAttributes']
                    activity_name = activity_attrs['activityType']['name']
                    # 3) Optionally, get the result from the activity.
                    result = last_event['activityTaskCompletedEventAttributes'].get('result')
    
                    # Take the decision.
                    if activity_name == 'ActivityA':
                        decisions.schedule_activity_task('%s-%i' % ('ActivityB', time.time()),
                            'ActivityB', self.version, task_list='b_tasks', input=result)
                    if activity_name == 'ActivityB':
                        decisions.schedule_activity_task('%s-%i' % ('ActivityC', time.time()),
                            'ActivityC', self.version, task_list='c_tasks', input=result)
                    elif activity_name == 'ActivityC':
                        # Final activity completed. We're done.
                        decisions.complete_workflow_execution()
    
                self.complete(decisions=decisions)
                return True

The workers only need to know which task lists to poll.

.. code-block:: python

    # serial_worker.py
    import time
    import boto.swf.layer2 as swf
    
    class MyBaseWorker(swf.ActivityWorker):
    
        domain = 'boto_tutorial'
        version = '1.0'
        task_list = None
    
        def run(self):
            activity_task = self.poll()
            if 'activityId' in activity_task:
                # Get input.
                # Get the method for the requested activity.
                try:
                    print 'working on activity from tasklist %s at %i' % (self.task_list, time.time())
                    self.activity(activity_task.get('input'))
                except Exception, error:
                    self.fail(reason=str(error))
                    raise error
    
                return True
    
        def activity(self, activity_input):
            raise NotImplementedError
    
    class WorkerA(MyBaseWorker):
        task_list = 'a_tasks'
        def activity(self, activity_input):
            self.complete(result="Now don't be givin him sambuca!")
    
    class WorkerB(MyBaseWorker):
        task_list = 'b_tasks'
        def activity(self, activity_input):
            self.complete()
    
    class WorkerC(MyBaseWorker):
        task_list = 'c_tasks'
        def activity(self, activity_input):
            self.complete()


Spin up a workflow execution and run the decider:

.. code-block:: bash

    $ python
    >>> import boto.swf.layer2 as swf
    >>> execution = swf.WorkflowType(name='SerialWorkflow', domain='boto_tutorial', version='1.0', task_list='default_tasks').start()
    >>> 
    
.. code-block:: bash

   $ python -i serial_decider.py
   >>> while SerialDecider().run(): pass
   ... 


Run the workers. The activities will be executed in order:

.. code-block:: bash

   $ python -i serial_worker.py
   >>> while WorkerA().run(): pass
   ... 
   working on activity from tasklist a_tasks at 1382046291

.. code-block:: bash

   $ python -i serial_worker.py
   >>> while WorkerB().run(): pass
   ... 
   working on activity from tasklist b_tasks at 1382046541

.. code-block:: bash

   $ python -i serial_worker.py
   >>> while WorkerC().run(): pass
   ... 
   working on activity from tasklist c_tasks at 1382046560


Looks good. Now, do the following to inspect the state and history of the execution:

.. code-block:: python

    >>> execution.describe()
    {'executionConfiguration': {'childPolicy': 'TERMINATE',
      'executionStartToCloseTimeout': '3600',
      'taskList': {'name': 'default_tasks'},
      'taskStartToCloseTimeout': '300'},
     'executionInfo': {'cancelRequested': False,
      'closeStatus': 'COMPLETED',
      'closeTimestamp': 1382046560.901,
      'execution': {'runId': '12fQ1zSaLmI5+lLXB8ux+8U+hLOnnXNZCY9Zy+ZvXgzhE=',
       'workflowId': 'SerialWorkflow-1.0-1382046514'},
      'executionStatus': 'CLOSED',
      'startTimestamp': 1382046514.994,
      'workflowType': {'name': 'SerialWorkflow', 'version': '1.0'}},
     'latestActivityTaskTimestamp': 1382046560.632,
     'openCounts': {'openActivityTasks': 0,
      'openChildWorkflowExecutions': 0,
      'openDecisionTasks': 0,
      'openTimers': 0}}
    >>> execution.history()
    ...

Parallel Activity Execution
---------------------------

When activities are independent from one another, their execution may be scheduled in parallel.

The decider schedules all activities at once and marks progress until all activities are completed, at which point the workflow is completed.

.. code-block:: python

    # parallel_decider.py

    import boto.swf.layer2 as swf
    import time

    SCHED_COUNT = 5

    class ParallelDecider(swf.Decider):

        domain = 'boto_tutorial'
        task_list = 'default'
        def run(self):
            decision_task = self.poll()
            if 'events' in decision_task:
                decisions = swf.Layer1Decisions()
                # Decision* events are irrelevant here and can be ignored.
                workflow_events = [e for e in decision_task['events'] 
                                   if not e['eventType'].startswith('Decision')]
                # Record latest non-decision event.
                last_event = workflow_events[-1]
                last_event_type = last_event['eventType']
                if last_event_type == 'WorkflowExecutionStarted':
                    # At start, kickoff SCHED_COUNT activities in parallel.
                    for i in range(SCHED_COUNT):
                        decisions.schedule_activity_task('activity%i' % i, 'ActivityA', '1.0',
                                                         task_list=self.task_list)
                elif last_event_type == 'ActivityTaskCompleted':
                    # Monitor progress. When all activities complete, complete workflow.
                    completed_count = sum([1 for a in decision_task['events']
                                           if a['eventType'] == 'ActivityTaskCompleted'])
                    print '%i/%i' % (completed_count, SCHED_COUNT)
                    if completed_count == SCHED_COUNT:
                        decisions.complete_workflow_execution()
                self.complete(decisions=decisions)
                return True

Again, the only bit of information a worker needs is which task list to poll.

.. code-block:: python

    # parallel_worker.py
    import time
    import boto.swf.layer2 as swf

    class ParallelWorker(swf.ActivityWorker):

        domain = 'boto_tutorial'
        task_list = 'default'

        def run(self):
            """Report current time."""
            activity_task = self.poll()
            if 'activityId' in activity_task:
                print 'working on', activity_task['activityId']
                self.complete(result=str(time.time()))
                return True

Spin up a workflow execution and run the decider:

.. code-block:: bash

    $ python -i parallel_decider.py 
    >>> execution = swf.WorkflowType(name='ParallelWorkflow', domain='boto_tutorial', version='1.0', task_list='default').start()
    >>> while ParallelDecider().run(): pass
    ... 
    1/5
    2/5
    4/5
    5/5

Run two or more workers to see how the service partitions work execution in parallel.

.. code-block:: bash

    $ python -i parallel_worker.py 
    >>> while ParallelWorker().run(): pass
    ... 
    working on activity1
    working on activity3
    working on activity4

.. code-block:: bash

    $ python -i parallel_worker.py 
    >>> while ParallelWorker().run(): pass
    ... 
    working on activity2
    working on activity0

As seen above, the work was partitioned between the two running workers.

Sub-Workflows
-------------

Sometimes it's desired or necessary to break the process up into multiple workflows.

Since the decider is stateless, it's up to you to determine which workflow is being used and which action
you would like to take.

.. code-block:: python

    import boto.swf.layer2 as swf

    class SubWorkflowDecider(swf.Decider):

        domain = 'boto_tutorial'
        task_list = 'default'
        version = '1.0'

        def run(self):
            history = self.poll()
            events = []
            if 'events' in history:
                events = history['events']
                # Collect the entire history if there are enough events to become paginated
                while 'nextPageToken' in history:
                    history = self.poll(next_page_token=history['nextPageToken'])
                    if 'events' in history:
                        events = events + history['events']

                workflow_type = history['workflowType']['name']

                # Get all of the relevent events that have happened since the last decision task was started
                workflow_events = [e for e in events
                        if e['eventId'] > history['previousStartedEventId'] and
                        not e['eventType'].startswith('Decision')]

                decisions = swf.Layer1Decisions()

                for event in workflow_events:
                    last_event_type = event['eventType']
                    if last_event_type == 'WorkflowExecutionStarted':
                        if workflow_type == 'SerialWorkflow':
                            decisions.start_child_workflow_execution('SubWorkflow', self.version,
                                "subworkflow_1", task_list=self.task_list, input="sub_1")
                        elif workflow_type == 'SubWorkflow':
                            for i in range(2):
                                decisions.schedule_activity_task("activity_%d" % i, 'ActivityA', self.version, task_list='a_tasks')
                        else:
                            decisions.fail_workflow_execution(reason="Unknown workflow %s" % workflow_type)
                            break

                    elif last_event_type == 'ChildWorkflowExecutionCompleted':
                        decisions.schedule_activity_task("activity_2", 'ActivityB', self.version, task_list='b_tasks')

                    elif last_event_type == 'ActivityTaskCompleted':
                        attrs = event['activityTaskCompletedEventAttributes']
                        activity = events[attrs['scheduledEventId'] - 1]
                        activity_name = activity['activityTaskScheduledEventAttributes']['activityType']['name']

                        if activity_name == 'ActivityA':
                            completed_count = sum([1 for a in events if a['eventType'] == 'ActivityTaskCompleted'])
                            if completed_count == 2:
                                # Complete the child workflow
                                decisions.complete_workflow_execution()
                        elif activity_name == 'ActivityB':
                            # Complete the parent workflow
                            decisions.complete_workflow_execution()

                self.complete(decisions=decisions)
            return True

Misc
----

Some of these things are not obvious by reading the API documents, so hopefully they help you
avoid some time-consuming pitfalls.

Pagination
==========

When the decider polls for new tasks, the maximum number of events it will return at a time is 100
(configurable to a smaller number, but not larger). When running a workflow, this number gets quickly
exceeded. If it does, the decision task will contain a key ``nextPageToken`` which can be submit to the
``poll()`` call to get the next page of events.

.. code-block:: python

    decision_task = self.poll()

    events = []
    if 'events' in decision_task:
      events = decision_task['events']
      while 'nextPageToken' in decision_task:
          decision_task = self.poll(next_page_token=decision_task['nextPageToken'])
          if 'events' in decision_task:
              events += decision_task['events']

Depending on your workflow logic, you might not need to aggregate all of the events.

Decision Tasks
==============

When first running deciders and activities, it may seem that the decider gets called for every event that
an activity triggers; however, this is not the case. More than one event can happen between decision tasks.
The decision task will contain a key ``previousStartedEventId`` that lets you know the ``eventId`` of the
last DecisionTaskStarted event that was processed. Your script will need to handle all of the events
that have happened since then, not just the last activity.

.. code-block:: python

    workflow_events = [e for e in events if e['eventId'] > decision_task['previousStartedEventId']]

You may also wish to still filter out tasks that start with 'Decision' or filter it in some other way
that fulfills your needs. You will now have to iterate over the workflow_events list and respond to
each event, as it may contain multiple events.

Filtering Events
================

When running many tasks in parallel, a common task is searching through the history to see how many events
of a particular activity type started, completed, and/or failed. Some basic list comprehension makes
this trivial.

.. code-block:: python

    def filter_completed_events(self, events, type):
        completed = [e for e in events if e['eventType'] == 'ActivityTaskCompleted']
        orig = [events[e['activityTaskCompletedEventAttributes']['scheduledEventId']-1] for e in completed]
        return [e for e in orig if e['activityTaskScheduledEventAttributes']['activityType']['name'] == type]

.. _Amazon SWF API Reference: http://docs.aws.amazon.com/amazonswf/latest/apireference/Welcome.html
.. _StackOverflow questions: http://stackoverflow.com/questions/tagged/amazon-swf
.. _Miscellaneous Blog Articles: http://log.ooz.ie/search/label/SimpleWorkflow