summaryrefslogtreecommitdiff
path: root/taskflow
diff options
context:
space:
mode:
Diffstat (limited to 'taskflow')
-rw-r--r--taskflow/test.py4
-rw-r--r--taskflow/tests/unit/action_engine/test_compile.py16
-rw-r--r--taskflow/tests/unit/patterns/test_graph_flow.py12
-rw-r--r--taskflow/tests/unit/test_check_transition.py6
-rw-r--r--taskflow/tests/unit/test_engine_helpers.py32
-rw-r--r--taskflow/tests/unit/test_engines.py2
-rw-r--r--taskflow/tests/unit/test_failure.py6
-rw-r--r--taskflow/tests/unit/test_flow_dependencies.py16
-rw-r--r--taskflow/tests/unit/test_functor_task.py4
-rw-r--r--taskflow/tests/unit/test_retries.py38
-rw-r--r--taskflow/tests/unit/test_storage.py40
-rw-r--r--taskflow/tests/unit/test_suspend.py6
-rw-r--r--taskflow/tests/unit/test_task.py22
13 files changed, 102 insertions, 102 deletions
diff --git a/taskflow/test.py b/taskflow/test.py
index 988afb4..ace97fc 100644
--- a/taskflow/test.py
+++ b/taskflow/test.py
@@ -122,8 +122,8 @@ class TestCase(base.BaseTestCase):
self.assertRaises(exc_class, access_func)
- def assertRaisesRegexp(self, exc_class, pattern, callable_obj,
- *args, **kwargs):
+ def assertRaisesRegex(self, exc_class, pattern, callable_obj,
+ *args, **kwargs):
# TODO(harlowja): submit a pull/review request to testtools to add
# this method to there codebase instead of having it exist in ours
# since it really doesn't belong here.
diff --git a/taskflow/tests/unit/action_engine/test_compile.py b/taskflow/tests/unit/action_engine/test_compile.py
index 48c51d6..1a310d1 100644
--- a/taskflow/tests/unit/action_engine/test_compile.py
+++ b/taskflow/tests/unit/action_engine/test_compile.py
@@ -54,8 +54,8 @@ class PatternCompileTest(test.TestCase):
def test_wrong_object(self):
msg_regex = '^Unknown object .* requested to compile'
- self.assertRaisesRegexp(TypeError, msg_regex,
- compiler.PatternCompiler(42).compile)
+ self.assertRaisesRegex(TypeError, msg_regex,
+ compiler.PatternCompiler(42).compile)
def test_empty(self):
flo = lf.Flow("test")
@@ -458,18 +458,18 @@ class PatternCompileTest(test.TestCase):
test_utils.DummyTask(name="a")
)
e = engines.load(flo)
- self.assertRaisesRegexp(exc.Duplicate,
- '^Atoms with duplicate names',
- e.compile)
+ self.assertRaisesRegex(exc.Duplicate,
+ '^Atoms with duplicate names',
+ e.compile)
def test_checks_for_dups_globally(self):
flo = gf.Flow("test").add(
gf.Flow("int1").add(test_utils.DummyTask(name="a")),
gf.Flow("int2").add(test_utils.DummyTask(name="a")))
e = engines.load(flo)
- self.assertRaisesRegexp(exc.Duplicate,
- '^Atoms with duplicate names',
- e.compile)
+ self.assertRaisesRegex(exc.Duplicate,
+ '^Atoms with duplicate names',
+ e.compile)
def test_retry_in_linear_flow(self):
flo = lf.Flow("test", retry.AlwaysRevert("c"))
diff --git a/taskflow/tests/unit/patterns/test_graph_flow.py b/taskflow/tests/unit/patterns/test_graph_flow.py
index 429f68f..8aab109 100644
--- a/taskflow/tests/unit/patterns/test_graph_flow.py
+++ b/taskflow/tests/unit/patterns/test_graph_flow.py
@@ -200,15 +200,15 @@ class GraphFlowTest(test.TestCase):
task1 = _task('task1')
task2 = _task('task2')
f = gf.Flow('test').add(task2)
- self.assertRaisesRegexp(ValueError, 'Node .* not found to link from',
- f.link, task1, task2)
+ self.assertRaisesRegex(ValueError, 'Node .* not found to link from',
+ f.link, task1, task2)
def test_graph_flow_link_to_unknown_node(self):
task1 = _task('task1')
task2 = _task('task2')
f = gf.Flow('test').add(task1)
- self.assertRaisesRegexp(ValueError, 'Node .* not found to link to',
- f.link, task1, task2)
+ self.assertRaisesRegex(ValueError, 'Node .* not found to link to',
+ f.link, task1, task2)
def test_graph_flow_link_raises_on_cycle(self):
task1 = _task('task1', provides=['a'])
@@ -288,8 +288,8 @@ class TargetedGraphFlowTest(test.TestCase):
task1 = _task('task1', provides=['a'], requires=[])
task2 = _task('task2', provides=['b'], requires=['a'])
f.add(task1)
- self.assertRaisesRegexp(ValueError, '^Node .* not found',
- f.set_target, task2)
+ self.assertRaisesRegex(ValueError, '^Node .* not found',
+ f.set_target, task2)
def test_targeted_flow_one_node(self):
f = gf.TargetedFlow("test")
diff --git a/taskflow/tests/unit/test_check_transition.py b/taskflow/tests/unit/test_check_transition.py
index a8b5a7c..d8bb594 100644
--- a/taskflow/tests/unit/test_check_transition.py
+++ b/taskflow/tests/unit/test_check_transition.py
@@ -33,9 +33,9 @@ class TransitionTest(test.TestCase):
self.assertFalse(self.check_transition(from_state, to_state), msg=msg)
def assertTransitionForbidden(self, from_state, to_state):
- self.assertRaisesRegexp(exc.InvalidState,
- self.transition_exc_regexp,
- self.check_transition, from_state, to_state)
+ self.assertRaisesRegex(exc.InvalidState,
+ self.transition_exc_regexp,
+ self.check_transition, from_state, to_state)
def assertTransitions(self, from_state, allowed=None, ignored=None,
forbidden=None):
diff --git a/taskflow/tests/unit/test_engine_helpers.py b/taskflow/tests/unit/test_engine_helpers.py
index 60fa755..74bca69 100644
--- a/taskflow/tests/unit/test_engine_helpers.py
+++ b/taskflow/tests/unit/test_engine_helpers.py
@@ -54,27 +54,27 @@ class FlowFromDetailTestCase(test.TestCase):
def test_no_meta(self):
_lb, flow_detail = p_utils.temporary_flow_detail()
self.assertEqual({}, flow_detail.meta)
- self.assertRaisesRegexp(ValueError,
- '^Cannot .* no factory information saved.$',
- taskflow.engines.flow_from_detail,
- flow_detail)
+ self.assertRaisesRegex(ValueError,
+ '^Cannot .* no factory information saved.$',
+ taskflow.engines.flow_from_detail,
+ flow_detail)
def test_no_factory_in_meta(self):
_lb, flow_detail = p_utils.temporary_flow_detail()
- self.assertRaisesRegexp(ValueError,
- '^Cannot .* no factory information saved.$',
- taskflow.engines.flow_from_detail,
- flow_detail)
+ self.assertRaisesRegex(ValueError,
+ '^Cannot .* no factory information saved.$',
+ taskflow.engines.flow_from_detail,
+ flow_detail)
def test_no_importable_function(self):
_lb, flow_detail = p_utils.temporary_flow_detail()
flow_detail.meta = dict(factory=dict(
name='you can not import me, i contain spaces'
))
- self.assertRaisesRegexp(ImportError,
- '^Could not import factory',
- taskflow.engines.flow_from_detail,
- flow_detail)
+ self.assertRaisesRegex(ImportError,
+ '^Could not import factory',
+ taskflow.engines.flow_from_detail,
+ flow_detail)
def test_no_arg_factory(self):
name = 'some.test.factory'
@@ -110,10 +110,10 @@ class LoadFromFactoryTestCase(test.TestCase):
def factory():
pass
- self.assertRaisesRegexp(ValueError,
- 'Flow factory .* is not reimportable',
- taskflow.engines.load_from_factory,
- factory)
+ self.assertRaisesRegex(ValueError,
+ 'Flow factory .* is not reimportable',
+ taskflow.engines.load_from_factory,
+ factory)
def test_it_works(self):
engine = taskflow.engines.load_from_factory(
diff --git a/taskflow/tests/unit/test_engines.py b/taskflow/tests/unit/test_engines.py
index 5a70d84..6fd86f2 100644
--- a/taskflow/tests/unit/test_engines.py
+++ b/taskflow/tests/unit/test_engines.py
@@ -1390,7 +1390,7 @@ class EngineCheckingTaskTest(utils.EngineTestBase):
utils.FailingTask('fail1')
)
engine = self._make_engine(flow)
- self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run)
+ self.assertRaisesRegex(RuntimeError, '^Woot', engine.run)
class SerialEngineTest(EngineTaskTest,
diff --git a/taskflow/tests/unit/test_failure.py b/taskflow/tests/unit/test_failure.py
index 99c2e70..217f1b3 100644
--- a/taskflow/tests/unit/test_failure.py
+++ b/taskflow/tests/unit/test_failure.py
@@ -89,7 +89,7 @@ class CaptureFailureTestCase(test.TestCase, GeneralFailureObjTestsMixin):
self.assertIs(exc_info[1], self.fail_obj.exception)
def test_reraises(self):
- self.assertRaisesRegexp(RuntimeError, '^Woot!$', self.fail_obj.reraise)
+ self.assertRaisesRegex(RuntimeError, '^Woot!$', self.fail_obj.reraise)
class ReCreatedFailureTestCase(test.TestCase, GeneralFailureObjTestsMixin):
@@ -209,8 +209,8 @@ class FailureObjectTestCase(test.TestCase):
def test_reraises_one(self):
fls = [_captured_failure('Woot!')]
- self.assertRaisesRegexp(RuntimeError, '^Woot!$',
- failure.Failure.reraise_if_any, fls)
+ self.assertRaisesRegex(RuntimeError, '^Woot!$',
+ failure.Failure.reraise_if_any, fls)
def test_reraises_several(self):
fls = [
diff --git a/taskflow/tests/unit/test_flow_dependencies.py b/taskflow/tests/unit/test_flow_dependencies.py
index 54f857b..0f60a91 100644
--- a/taskflow/tests/unit/test_flow_dependencies.py
+++ b/taskflow/tests/unit/test_flow_dependencies.py
@@ -233,14 +233,14 @@ class FlowDependenciesTest(test.TestCase):
def test_graph_cyclic_dependency(self):
flow = gf.Flow('g-3-cyclic')
- self.assertRaisesRegexp(exceptions.DependencyFailure, '^No path',
- flow.add,
- utils.TaskOneArgOneReturn(provides='a',
- requires=['b']),
- utils.TaskOneArgOneReturn(provides='b',
- requires=['c']),
- utils.TaskOneArgOneReturn(provides='c',
- requires=['a']))
+ self.assertRaisesRegex(exceptions.DependencyFailure, '^No path',
+ flow.add,
+ utils.TaskOneArgOneReturn(provides='a',
+ requires=['b']),
+ utils.TaskOneArgOneReturn(provides='b',
+ requires=['c']),
+ utils.TaskOneArgOneReturn(provides='c',
+ requires=['a']))
def test_task_requires_and_provides_same_values(self):
flow = lf.Flow('lf', utils.TaskOneArgOneReturn('rt', requires='x',
diff --git a/taskflow/tests/unit/test_functor_task.py b/taskflow/tests/unit/test_functor_task.py
index cc45720..ff44bac 100644
--- a/taskflow/tests/unit/test_functor_task.py
+++ b/taskflow/tests/unit/test_functor_task.py
@@ -65,8 +65,8 @@ class FunctorTaskTest(test.TestCase):
t(bof.run_one, revert=bof.revert_one),
t(bof.run_fail)
)
- self.assertRaisesRegexp(RuntimeError, '^Woot',
- taskflow.engines.run, flow)
+ self.assertRaisesRegex(RuntimeError, '^Woot',
+ taskflow.engines.run, flow)
self.assertEqual(['one', 'fail', 'revert one'], values)
def test_lambda_functors(self):
diff --git a/taskflow/tests/unit/test_retries.py b/taskflow/tests/unit/test_retries.py
index 8532997..3eb5b20 100644
--- a/taskflow/tests/unit/test_retries.py
+++ b/taskflow/tests/unit/test_retries.py
@@ -103,7 +103,7 @@ class RetryTest(utils.EngineTestBase):
engine = self._make_engine(flow)
engine.storage.inject({'y': 4})
with utils.CaptureListener(engine) as capturer:
- self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run)
+ self.assertRaisesRegex(RuntimeError, '^Woot', engine.run)
self.assertEqual({'y': 4}, engine.storage.fetch_all())
expected = ['flow-1.f RUNNING',
'r1.r RUNNING',
@@ -142,7 +142,7 @@ class RetryTest(utils.EngineTestBase):
engine = self._make_engine(flow)
engine.storage.inject({'y': 4})
with utils.CaptureListener(engine) as capturer:
- self.assertRaisesRegexp(RuntimeError, '^Gotcha', engine.run)
+ self.assertRaisesRegex(RuntimeError, '^Gotcha', engine.run)
self.assertEqual({'y': 4, 'x': 1}, engine.storage.fetch_all())
expected = ['flow-1.f RUNNING',
'r1.r RUNNING',
@@ -468,7 +468,7 @@ class RetryTest(utils.EngineTestBase):
engine = self._make_engine(flow)
engine.storage.inject({'y': 2})
with utils.CaptureListener(engine) as capturer:
- self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run)
+ self.assertRaisesRegex(RuntimeError, '^Woot', engine.run)
self.assertEqual({'y': 2}, engine.storage.fetch_all())
expected = ['flow-1.f RUNNING',
'r1.r RUNNING',
@@ -494,8 +494,8 @@ class RetryTest(utils.EngineTestBase):
flow = lf.Flow('test', retry=utils.OneReturnRetry(provides='x')).add(
utils.FailingTask('fail'))
engine = self._make_engine(flow)
- self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run)
- self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run)
+ self.assertRaisesRegex(RuntimeError, '^Woot', engine.run)
+ self.assertRaisesRegex(RuntimeError, '^Woot', engine.run)
def test_run_just_retry(self):
flow = utils.OneReturnRetry(provides='x')
@@ -596,7 +596,7 @@ class RetryTest(utils.EngineTestBase):
utils.FailingTask('t2'))
engine = self._make_engine(flow)
with utils.CaptureListener(engine) as capturer:
- self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run)
+ self.assertRaisesRegex(RuntimeError, '^Woot', engine.run)
expected = ['flow-1.f RUNNING',
'r1.r RUNNING',
'r1.r SUCCESS(1)',
@@ -645,7 +645,7 @@ class RetryTest(utils.EngineTestBase):
flow = lf.Flow('flow-1', retry1).add(utils.FailingTaskWithOneArg('t1'))
engine = self._make_engine(flow)
with utils.CaptureListener(engine) as capturer:
- self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run)
+ self.assertRaisesRegex(RuntimeError, '^Woot', engine.run)
expected = ['flow-1.f RUNNING',
'r1.r RUNNING',
'r1.r SUCCESS(3)',
@@ -688,7 +688,7 @@ class RetryTest(utils.EngineTestBase):
flow = lf.Flow('flow-1', retry1).add(utils.FailingTaskWithOneArg('t1'))
engine = self._make_engine(flow)
with utils.CaptureListener(engine) as capturer:
- self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run)
+ self.assertRaisesRegex(RuntimeError, '^Woot', engine.run)
expected = ['flow-1.f RUNNING',
'r1.r RUNNING',
'r1.r SUCCESS(2)',
@@ -728,7 +728,7 @@ class RetryTest(utils.EngineTestBase):
)
engine = self._make_engine(flow)
with utils.CaptureListener(engine) as capturer:
- self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run)
+ self.assertRaisesRegex(RuntimeError, '^Woot', engine.run)
expected = ['flow-1.f RUNNING',
'task1.t RUNNING',
'task1.t SUCCESS(5)',
@@ -778,7 +778,7 @@ class RetryTest(utils.EngineTestBase):
)
engine = self._make_engine(flow)
with utils.CaptureListener(engine) as capturer:
- self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run)
+ self.assertRaisesRegex(RuntimeError, '^Woot', engine.run)
expected = ['flow-1.f RUNNING',
'task1.t RUNNING',
'task1.t SUCCESS(5)',
@@ -825,7 +825,7 @@ class RetryTest(utils.EngineTestBase):
flow = lf.Flow('flow-1', retry1).add(utils.ConditionalTask('t1'))
engine = self._make_engine(flow)
engine.storage.inject({'y': 1})
- self.assertRaisesRegexp(exc.NotFound, '^No elements left', engine.run)
+ self.assertRaisesRegex(exc.NotFound, '^No elements left', engine.run)
def test_parameterized_for_each_with_list(self):
values = [3, 2, 5]
@@ -834,7 +834,7 @@ class RetryTest(utils.EngineTestBase):
engine = self._make_engine(flow)
engine.storage.inject({'values': values, 'y': 1})
with utils.CaptureListener(engine) as capturer:
- self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run)
+ self.assertRaisesRegex(RuntimeError, '^Woot', engine.run)
expected = ['flow-1.f RUNNING',
'r1.r RUNNING',
'r1.r SUCCESS(3)',
@@ -870,7 +870,7 @@ class RetryTest(utils.EngineTestBase):
engine = self._make_engine(flow)
engine.storage.inject({'values': values, 'y': 1})
with utils.CaptureListener(engine) as capturer:
- self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run)
+ self.assertRaisesRegex(RuntimeError, '^Woot', engine.run)
expected = ['flow-1.f RUNNING',
'r1.r RUNNING',
'r1.r SUCCESS(3)',
@@ -911,7 +911,7 @@ class RetryTest(utils.EngineTestBase):
engine = self._make_engine(flow)
engine.storage.inject({'values': values, 'y': 1})
with utils.CaptureListener(engine) as capturer:
- self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run)
+ self.assertRaisesRegex(RuntimeError, '^Woot', engine.run)
expected = ['flow-1.f RUNNING',
'task-1.t RUNNING',
'task-1.t SUCCESS(5)',
@@ -955,7 +955,7 @@ class RetryTest(utils.EngineTestBase):
engine = self._make_engine(flow)
engine.storage.inject({'values': values, 'y': 1})
with utils.CaptureListener(engine) as capturer:
- self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run)
+ self.assertRaisesRegex(RuntimeError, '^Woot', engine.run)
expected = ['flow-1.f RUNNING',
'task-1.t RUNNING',
'task-1.t SUCCESS(5)',
@@ -994,7 +994,7 @@ class RetryTest(utils.EngineTestBase):
flow = lf.Flow('flow-1', retry1).add(utils.ConditionalTask('t1'))
engine = self._make_engine(flow)
engine.storage.inject({'values': values, 'y': 1})
- self.assertRaisesRegexp(exc.NotFound, '^No elements left', engine.run)
+ self.assertRaisesRegex(exc.NotFound, '^No elements left', engine.run)
def _pretend_to_run_a_flow_and_crash(self, when):
flow = uf.Flow('flow-1', retry.Times(3, provides='x')).add(
@@ -1109,7 +1109,7 @@ class RetryTest(utils.EngineTestBase):
r = FailingRetry()
flow = lf.Flow('testflow', r)
engine = self._make_engine(flow)
- self.assertRaisesRegexp(ValueError, '^OMG', engine.run)
+ self.assertRaisesRegex(ValueError, '^OMG', engine.run)
self.assertEqual(1, len(engine.storage.get_retry_histories()))
self.assertEqual(0, len(r.history))
self.assertEqual([], list(r.history.outcomes_iter()))
@@ -1120,7 +1120,7 @@ class RetryTest(utils.EngineTestBase):
r = NastyFailingRetry()
flow = lf.Flow('testflow', r)
engine = self._make_engine(flow)
- self.assertRaisesRegexp(ValueError, '^WOOT', engine.run)
+ self.assertRaisesRegex(ValueError, '^WOOT', engine.run)
def test_nested_provides_graph_reverts_correctly(self):
flow = gf.Flow("test").add(
@@ -1135,7 +1135,7 @@ class RetryTest(utils.EngineTestBase):
engine.storage.save('b', 11)
engine.storage.save('a', 10)
with utils.CaptureListener(engine, capture_flow=False) as capturer:
- self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run)
+ self.assertRaisesRegex(RuntimeError, '^Woot', engine.run)
expected = ['c.t RUNNING',
'c.t FAILURE(Failure: RuntimeError: Woot!)',
'a.t REVERTING',
diff --git a/taskflow/tests/unit/test_storage.py b/taskflow/tests/unit/test_storage.py
index 7cc904b..b3f7666 100644
--- a/taskflow/tests/unit/test_storage.py
+++ b/taskflow/tests/unit/test_storage.py
@@ -191,9 +191,9 @@ class StorageTestMixin(object):
def test_fetch_unknown_name(self):
s = self._get_storage()
- self.assertRaisesRegexp(exceptions.NotFound,
- "^Name 'xxx' is not mapped",
- s.fetch, 'xxx')
+ self.assertRaisesRegex(exceptions.NotFound,
+ "^Name 'xxx' is not mapped",
+ s.fetch, 'xxx')
def test_flow_metadata_update(self):
s = self._get_storage()
@@ -375,8 +375,8 @@ class StorageTestMixin(object):
def test_get_state_of_unknown_task(self):
s = self._get_storage()
- self.assertRaisesRegexp(exceptions.NotFound, '^Unknown',
- s.get_atom_state, 'my task')
+ self.assertRaisesRegex(exceptions.NotFound, '^Unknown',
+ s.get_atom_state, 'my task')
def test_task_by_name(self):
s = self._get_storage()
@@ -414,9 +414,9 @@ class StorageTestMixin(object):
def test_unknown_task_by_name(self):
s = self._get_storage()
- self.assertRaisesRegexp(exceptions.NotFound,
- '^Unknown atom',
- s.get_atom_uuid, '42')
+ self.assertRaisesRegex(exceptions.NotFound,
+ '^Unknown atom',
+ s.get_atom_uuid, '42')
def test_initial_flow_state(self):
s = self._get_storage()
@@ -439,23 +439,23 @@ class StorageTestMixin(object):
s = self._get_storage()
s.ensure_atom(test_utils.NoopTask('my task', provides=set(['result'])))
s.save('my task', {})
- self.assertRaisesRegexp(exceptions.NotFound,
- '^Unable to find result', s.fetch, 'result')
+ self.assertRaisesRegex(exceptions.NotFound,
+ '^Unable to find result', s.fetch, 'result')
def test_empty_result_is_checked(self):
s = self._get_storage()
s.ensure_atom(test_utils.NoopTask('my task', provides=['a']))
s.save('my task', ())
- self.assertRaisesRegexp(exceptions.NotFound,
- '^Unable to find result', s.fetch, 'a')
+ self.assertRaisesRegex(exceptions.NotFound,
+ '^Unable to find result', s.fetch, 'a')
def test_short_result_is_checked(self):
s = self._get_storage()
s.ensure_atom(test_utils.NoopTask('my task', provides=['a', 'b']))
s.save('my task', ['result'])
self.assertEqual('result', s.fetch('a'))
- self.assertRaisesRegexp(exceptions.NotFound,
- '^Unable to find result', s.fetch, 'b')
+ self.assertRaisesRegex(exceptions.NotFound,
+ '^Unable to find result', s.fetch, 'b')
def test_ensure_retry(self):
s = self._get_storage()
@@ -466,9 +466,9 @@ class StorageTestMixin(object):
def test_ensure_retry_and_task_with_same_name(self):
s = self._get_storage()
s.ensure_atom(test_utils.NoopTask('my retry'))
- self.assertRaisesRegexp(exceptions.Duplicate,
- '^Atom detail', s.ensure_atom,
- test_utils.NoopRetry('my retry'))
+ self.assertRaisesRegex(exceptions.Duplicate,
+ '^Atom detail', s.ensure_atom,
+ test_utils.NoopRetry('my retry'))
def test_save_retry_results(self):
s = self._get_storage()
@@ -516,9 +516,9 @@ class StorageTestMixin(object):
self.assertEqual({'my retry': a_failure}, s.get_failures())
def test_logbook_get_unknown_atom_type(self):
- self.assertRaisesRegexp(TypeError,
- 'Unknown atom',
- models.atom_detail_class, 'some_detail')
+ self.assertRaisesRegex(TypeError,
+ 'Unknown atom',
+ models.atom_detail_class, 'some_detail')
def test_save_task_intention(self):
s = self._get_storage()
diff --git a/taskflow/tests/unit/test_suspend.py b/taskflow/tests/unit/test_suspend.py
index 6559bfc..3a9a8ad 100644
--- a/taskflow/tests/unit/test_suspend.py
+++ b/taskflow/tests/unit/test_suspend.py
@@ -102,7 +102,7 @@ class SuspendTest(utils.EngineTestBase):
'b.t REVERTED(None)']
self.assertEqual(expected, capturer.values)
with utils.CaptureListener(engine, capture_flow=False) as capturer:
- self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run)
+ self.assertRaisesRegex(RuntimeError, '^Woot', engine.run)
self.assertEqual(states.REVERTED, engine.storage.get_flow_state())
expected = ['a.t REVERTING', 'a.t REVERTED(None)']
self.assertEqual(expected, capturer.values)
@@ -132,7 +132,7 @@ class SuspendTest(utils.EngineTestBase):
# pretend we are resuming
engine2 = self._make_engine(flow, engine.storage._flowdetail)
with utils.CaptureListener(engine2, capture_flow=False) as capturer2:
- self.assertRaisesRegexp(RuntimeError, '^Woot', engine2.run)
+ self.assertRaisesRegex(RuntimeError, '^Woot', engine2.run)
self.assertEqual(states.REVERTED, engine2.storage.get_flow_state())
expected = ['a.t REVERTING',
'a.t REVERTED(None)']
@@ -169,7 +169,7 @@ class SuspendTest(utils.EngineTestBase):
)
engine2 = self._make_engine(flow2, engine.storage._flowdetail)
with utils.CaptureListener(engine2, capture_flow=False) as capturer2:
- self.assertRaisesRegexp(RuntimeError, '^Woot', engine2.run)
+ self.assertRaisesRegex(RuntimeError, '^Woot', engine2.run)
self.assertEqual(states.REVERTED, engine2.storage.get_flow_state())
expected = ['a.t REVERTING', 'a.t REVERTED(None)']
self.assertEqual(expected, capturer2.values)
diff --git a/taskflow/tests/unit/test_task.py b/taskflow/tests/unit/test_task.py
index f008ab5..32b3e45 100644
--- a/taskflow/tests/unit/test_task.py
+++ b/taskflow/tests/unit/test_task.py
@@ -109,8 +109,8 @@ class TaskTest(test.TestCase):
self.assertEqual({'food': 0}, my_task.save_as)
def test_bad_provides(self):
- self.assertRaisesRegexp(TypeError, '^Atom provides',
- MyTask, provides=object())
+ self.assertRaisesRegex(TypeError, '^Atom provides',
+ MyTask, provides=object())
def test_requires_by_default(self):
my_task = MyTask()
@@ -144,9 +144,9 @@ class TaskTest(test.TestCase):
self.assertEqual(expected, my_task.rebind)
def test_requires_explicit_not_enough(self):
- self.assertRaisesRegexp(ValueError, '^Missing arguments',
- MyTask,
- auto_extract=False, requires=('spam', 'eggs'))
+ self.assertRaisesRegex(ValueError, '^Missing arguments',
+ MyTask,
+ auto_extract=False, requires=('spam', 'eggs'))
def test_requires_ignores_optional(self):
my_task = DefaultArgTask()
@@ -189,8 +189,8 @@ class TaskTest(test.TestCase):
my_task.requires)
def test_rebind_unknown(self):
- self.assertRaisesRegexp(ValueError, '^Extra arguments',
- MyTask, rebind={'foo': 'bar'})
+ self.assertRaisesRegex(ValueError, '^Extra arguments',
+ MyTask, rebind={'foo': 'bar'})
def test_rebind_unknown_kwargs(self):
my_task = KwargsTask(rebind={'foo': 'bar'})
@@ -223,8 +223,8 @@ class TaskTest(test.TestCase):
my_task.requires)
def test_rebind_list_more(self):
- self.assertRaisesRegexp(ValueError, '^Extra arguments',
- MyTask, rebind=('a', 'b', 'c', 'd'))
+ self.assertRaisesRegex(ValueError, '^Extra arguments',
+ MyTask, rebind=('a', 'b', 'c', 'd'))
def test_rebind_list_more_kwargs(self):
my_task = KwargsTask(rebind=('a', 'b', 'c'))
@@ -238,8 +238,8 @@ class TaskTest(test.TestCase):
my_task.requires)
def test_rebind_list_bad_value(self):
- self.assertRaisesRegexp(TypeError, '^Invalid rebind value',
- MyTask, rebind=object())
+ self.assertRaisesRegex(TypeError, '^Invalid rebind value',
+ MyTask, rebind=object())
def test_default_provides(self):
my_task = DefaultProvidesTask()