diff options
Diffstat (limited to 'tests/swf/test_layer1_workflow_execution.py')
-rw-r--r-- | tests/swf/test_layer1_workflow_execution.py | 171 |
1 files changed, 171 insertions, 0 deletions
diff --git a/tests/swf/test_layer1_workflow_execution.py b/tests/swf/test_layer1_workflow_execution.py new file mode 100644 index 00000000..2f47130f --- /dev/null +++ b/tests/swf/test_layer1_workflow_execution.py @@ -0,0 +1,171 @@ +""" +Tests for Layer1 of Simple Workflow + +""" +import time +import uuid +import json +import traceback + +from boto.swf.layer1_decisions import Layer1Decisions + +from test_layer1 import SimpleWorkflowLayer1TestBase + + + +class SwfL1WorkflowExecutionTest(SimpleWorkflowLayer1TestBase): + """ + test a simple workflow execution + """ + def run_decider(self): + """ + run one iteration of a simple decision engine + """ + # Poll for a decision task. + tries = 0 + while 1: + dtask = self.conn.poll_for_decision_task(self._domain, + self._task_list, reverse_order=True) + if dtask.get('taskToken') is not None: + # This means a real decision task has arrived. + break + time.sleep(2) + tries += 1 + if tries > 10: + # Give up if it's taking too long. Probably + # means something is broken somewhere else. + assert False, 'no decision task occurred' + + # Get the most recent interesting event. + ignorable = ( + 'DecisionTaskScheduled', + 'DecisionTaskStarted', + 'DecisionTaskTimedOut', + ) + event = None + for tevent in dtask['events']: + if tevent['eventType'] not in ignorable: + event = tevent + break + + # Construct the decision response. + decisions = Layer1Decisions() + if event['eventType'] == 'WorkflowExecutionStarted': + activity_id = str(uuid.uuid1()) + decisions.schedule_activity_task(activity_id, + self._activity_type_name, self._activity_type_version, + task_list=self._task_list, + input=event['workflowExecutionStartedEventAttributes']['input']) + elif event['eventType'] == 'ActivityTaskCompleted': + decisions.complete_workflow_execution( + result=event['activityTaskCompletedEventAttributes']['result']) + elif event['eventType'] == 'ActivityTaskFailed': + decisions.fail_workflow_execution( + reason=event['activityTaskFailedEventAttributes']['reason'], + details=event['activityTaskFailedEventAttributes']['details']) + else: + decisions.fail_workflow_execution( + reason='unhandled decision task type; %r' % (event['eventType'],)) + + # Send the decision response. + r = self.conn.respond_decision_task_completed(dtask['taskToken'], + decisions=decisions._data, + execution_context=None) + assert r is None + + + def run_worker(self): + """ + run one iteration of a simple worker engine + """ + # Poll for an activity task. + tries = 0 + while 1: + atask = self.conn.poll_for_activity_task(self._domain, + self._task_list, identity='test worker') + if atask.get('activityId') is not None: + # This means a real activity task has arrived. + break + time.sleep(2) + tries += 1 + if tries > 10: + # Give up if it's taking too long. Probably + # means something is broken somewhere else. + assert False, 'no activity task occurred' + # Do the work or catch a "work exception." + reason = None + try: + result = json.dumps(sum(json.loads(atask['input']))) + except: + reason = 'an exception was raised' + details = traceback.format_exc() + if reason is None: + r = self.conn.respond_activity_task_completed( + atask['taskToken'], result) + else: + r = self.conn.respond_activity_task_failed( + atask['taskToken'], reason=reason, details=details) + assert r is None + + + def test_workflow_execution(self): + # Start a workflow execution whose activity task will succeed. + workflow_id = 'wfid-%.2f' % (time.time(),) + r = self.conn.start_workflow_execution(self._domain, + workflow_id, + self._workflow_type_name, + self._workflow_type_version, + execution_start_to_close_timeout='20', + input='[600, 15]') + # Need the run_id to lookup the execution history later. + run_id = r['runId'] + + # Move the workflow execution forward by having the + # decider schedule an activity task. + self.run_decider() + + # Run the worker to handle the scheduled activity task. + self.run_worker() + + # Complete the workflow execution by having the + # decider close it down. + self.run_decider() + + # Check that the result was stored in the execution history. + r = self.conn.get_workflow_execution_history(self._domain, + run_id, workflow_id, + reverse_order=True)['events'][0] + result = r['workflowExecutionCompletedEventAttributes']['result'] + assert json.loads(result) == 615 + + + def test_failed_workflow_execution(self): + # Start a workflow execution whose activity task will fail. + workflow_id = 'wfid-%.2f' % (time.time(),) + r = self.conn.start_workflow_execution(self._domain, + workflow_id, + self._workflow_type_name, + self._workflow_type_version, + execution_start_to_close_timeout='20', + input='[600, "s"]') + # Need the run_id to lookup the execution history later. + run_id = r['runId'] + + # Move the workflow execution forward by having the + # decider schedule an activity task. + self.run_decider() + + # Run the worker to handle the scheduled activity task. + self.run_worker() + + # Complete the workflow execution by having the + # decider close it down. + self.run_decider() + + # Check that the failure was stored in the execution history. + r = self.conn.get_workflow_execution_history(self._domain, + run_id, workflow_id, + reverse_order=True)['events'][0] + reason = r['workflowExecutionFailedEventAttributes']['reason'] + assert reason == 'an exception was raised' + |