diff options
| author | Robert Collins <robertc@robertcollins.net> | 2012-05-02 21:45:14 +1200 |
|---|---|---|
| committer | Robert Collins <robertc@robertcollins.net> | 2012-05-02 21:45:14 +1200 |
| commit | 33b73acea72a1799f42f3fabf2034abf2d114964 (patch) | |
| tree | 18e77cdde71a65a21e54390f56c1f057f7b25f92 /python/subunit | |
| parent | c230e78f9e3f9a464c011b85779700452c647dfb (diff) | |
| parent | f42ef2fb24def4c19433e18db492416983c56e32 (diff) | |
| download | subunit-git-33b73acea72a1799f42f3fabf2034abf2d114964.tar.gz | |
* Tags can now be filtered. (Jonathan Lange, #664171)
Diffstat (limited to 'python/subunit')
| -rw-r--r-- | python/subunit/test_results.py | 296 | ||||
| -rw-r--r-- | python/subunit/tests/test_subunit_filter.py | 137 |
2 files changed, 319 insertions, 114 deletions
diff --git a/python/subunit/test_results.py b/python/subunit/test_results.py index b800be9..bb67df0 100644 --- a/python/subunit/test_results.py +++ b/python/subunit/test_results.py @@ -20,6 +20,7 @@ import csv import datetime import testtools +from testtools.compat import all from testtools.content import ( text_content, TracebackContent, @@ -39,6 +40,9 @@ class TestResultDecorator(object): or features by degrading them. """ + # XXX: Since lp:testtools r250, this is in testtools. Once it's released, + # we should gut this and just use that. + def __init__(self, decorated): """Create a TestResultDecorator forwarding to decorated.""" # Make every decorator degrade gracefully. @@ -205,47 +209,44 @@ class AutoTimingTestResultDecorator(HookedTestResultDecorator): return self.decorated.time(a_datetime) -class TagCollapsingDecorator(HookedTestResultDecorator): - """Collapses many 'tags' calls into one where possible.""" +class TagsMixin(object): - def __init__(self, result): - super(TagCollapsingDecorator, self).__init__(result) + def __init__(self): self._clear_tags() def _clear_tags(self): self._global_tags = set(), set() - self._current_test_tags = None - - def _get_current_tags(self): - if self._current_test_tags: - return self._current_test_tags + self._test_tags = None + + def _get_active_tags(self): + global_new, global_gone = self._global_tags + if self._test_tags is None: + return set(global_new) + test_new, test_gone = self._test_tags + return global_new.difference(test_gone).union(test_new) + + def _get_current_scope(self): + if self._test_tags: + return self._test_tags return self._global_tags + def _flush_current_scope(self, tag_receiver): + new_tags, gone_tags = self._get_current_scope() + if new_tags or gone_tags: + tag_receiver.tags(new_tags, gone_tags) + if self._test_tags: + self._test_tags = set(), set() + else: + self._global_tags = set(), set() + def startTestRun(self): - super(TagCollapsingDecorator, self).startTestRun() self._clear_tags() def startTest(self, test): - """Start a test. - - Not directly passed to the client, but used for handling of tags - correctly. - """ - super(TagCollapsingDecorator, self).startTest(test) - self._current_test_tags = set(), set() + self._test_tags = set(), set() def stopTest(self, test): - super(TagCollapsingDecorator, self).stopTest(test) - self._current_test_tags = None - - def _before_event(self): - new_tags, gone_tags = self._get_current_tags() - if new_tags or gone_tags: - self.decorated.tags(new_tags, gone_tags) - if self._current_test_tags: - self._current_test_tags = set(), set() - else: - self._global_tags = set(), set() + self._test_tags = None def tags(self, new_tags, gone_tags): """Handle tag instructions. @@ -256,13 +257,27 @@ class TagCollapsingDecorator(HookedTestResultDecorator): :param new_tags: Tags to add, :param gone_tags: Tags to remove. """ - current_new_tags, current_gone_tags = self._get_current_tags() + current_new_tags, current_gone_tags = self._get_current_scope() current_new_tags.update(new_tags) current_new_tags.difference_update(gone_tags) current_gone_tags.update(gone_tags) current_gone_tags.difference_update(new_tags) +class TagCollapsingDecorator(HookedTestResultDecorator, TagsMixin): + """Collapses many 'tags' calls into one where possible.""" + + def __init__(self, result): + super(TagCollapsingDecorator, self).__init__(result) + self._clear_tags() + + def _before_event(self): + self._flush_current_scope(self.decorated) + + def tags(self, new_tags, gone_tags): + TagsMixin.tags(self, new_tags, gone_tags) + + class TimeCollapsingDecorator(HookedTestResultDecorator): """Only pass on the first and last of a consecutive sequence of times.""" @@ -288,93 +303,58 @@ class TimeCollapsingDecorator(HookedTestResultDecorator): self._last_received_time = a_time -def all_true(bools): - """Return True if all of 'bools' are True. False otherwise.""" - for b in bools: - if not b: - return False - return True +def and_predicates(predicates): + """Return a predicate that is true iff all predicates are true.""" + # XXX: Should probably be in testtools to be better used by matchers. jml + return lambda *args, **kwargs: all(p(*args, **kwargs) for p in predicates) -class TestResultFilter(TestResultDecorator): - """A pyunit TestResult interface implementation which filters tests. +def _make_tag_filter(with_tags, without_tags): + """Make a callback that checks tests against tags.""" - Tests that pass the filter are handed on to another TestResult instance - for further processing/reporting. To obtain the filtered results, - the other instance must be interrogated. + with_tags = with_tags and set(with_tags) or None + without_tags = without_tags and set(without_tags) or None - :ivar result: The result that tests are passed to after filtering. - :ivar filter_predicate: The callback run to decide whether to pass - a result. - """ + def check_tags(test, outcome, err, details, tags): + if with_tags and not with_tags <= tags: + return False + if without_tags and bool(without_tags & tags): + return False + return True - def __init__(self, result, filter_error=False, filter_failure=False, - filter_success=True, filter_skip=False, filter_xfail=False, - filter_predicate=None, fixup_expected_failures=None): - """Create a FilterResult object filtering to result. + return check_tags - :param filter_error: Filter out errors. - :param filter_failure: Filter out failures. - :param filter_success: Filter out successful tests. - :param filter_skip: Filter out skipped tests. - :param filter_xfail: Filter out expected failure tests. - :param filter_predicate: A callable taking (test, outcome, err, - details) and returning True if the result should be passed - through. err and details may be none if no error or extra - metadata is available. outcome is the name of the outcome such - as 'success' or 'failure'. - :param fixup_expected_failures: Set of test ids to consider known - failing. - """ - super(TestResultFilter, self).__init__(result) + +class _PredicateFilter(TestResultDecorator, TagsMixin): + + def __init__(self, result, predicate): + super(_PredicateFilter, self).__init__(result) + self._clear_tags() self.decorated = TimeCollapsingDecorator( TagCollapsingDecorator(self.decorated)) - predicates = [] - if filter_error: - predicates.append(lambda t, outcome, e, d: outcome != 'error') - if filter_failure: - predicates.append(lambda t, outcome, e, d: outcome != 'failure') - if filter_success: - predicates.append(lambda t, outcome, e, d: outcome != 'success') - if filter_skip: - predicates.append(lambda t, outcome, e, d: outcome != 'skip') - if filter_xfail: - predicates.append(lambda t, outcome, e, d: outcome != 'expectedfailure') - if filter_predicate is not None: - predicates.append(filter_predicate) - self.filter_predicate = ( - lambda test, outcome, err, details: - all_true(p(test, outcome, err, details) for p in predicates)) + self._predicate = predicate # The current test (for filtering tags) self._current_test = None # Has the current test been filtered (for outputting test tags) self._current_test_filtered = None # Calls to this result that we don't know whether to forward on yet. self._buffered_calls = [] - if fixup_expected_failures is None: - self._fixup_expected_failures = frozenset() - else: - self._fixup_expected_failures = fixup_expected_failures + + def filter_predicate(self, test, outcome, error, details): + return self._predicate( + test, outcome, error, details, self._get_active_tags()) def addError(self, test, err=None, details=None): if (self.filter_predicate(test, 'error', err, details)): - if self._failure_expected(test): - self._buffered_calls.append( - ('addExpectedFailure', [test, err], {'details': details})) - else: - self._buffered_calls.append( - ('addError', [test, err], {'details': details})) + self._buffered_calls.append( + ('addError', [test, err], {'details': details})) else: self._filtered() def addFailure(self, test, err=None, details=None): if (self.filter_predicate(test, 'failure', err, details)): - if self._failure_expected(test): - self._buffered_calls.append( - ('addExpectedFailure', [test, err], {'details': details})) - else: - self._buffered_calls.append( - ('addFailure', [test, err], {'details': details})) + self._buffered_calls.append( + ('addFailure', [test, err], {'details': details})) else: self._filtered() @@ -385,17 +365,6 @@ class TestResultFilter(TestResultDecorator): else: self._filtered() - def addSuccess(self, test, details=None): - if (self.filter_predicate(test, 'success', None, details)): - if self._failure_expected(test): - self._buffered_calls.append( - ('addUnexpectedSuccess', [test], {'details': details})) - else: - self._buffered_calls.append( - ('addSuccess', [test], {'details': details})) - else: - self._filtered() - def addExpectedFailure(self, test, err=None, details=None): if self.filter_predicate(test, 'expectedfailure', err, details): self._buffered_calls.append( @@ -407,18 +376,23 @@ class TestResultFilter(TestResultDecorator): self._buffered_calls.append( ('addUnexpectedSuccess', [test], {'details': details})) + def addSuccess(self, test, details=None): + if (self.filter_predicate(test, 'success', None, details)): + self._buffered_calls.append( + ('addSuccess', [test], {'details': details})) + else: + self._filtered() + def _filtered(self): self._current_test_filtered = True - def _failure_expected(self, test): - return (test.id() in self._fixup_expected_failures) - def startTest(self, test): """Start a test. Not directly passed to the client, but used for handling of tags correctly. """ + TagsMixin.startTest(self, test) self._current_test = test self._current_test_filtered = False self._buffered_calls.append(('startTest', [test], {})) @@ -430,16 +404,26 @@ class TestResultFilter(TestResultDecorator): correctly. """ if not self._current_test_filtered: - # Tags to output for this test. for method, args, kwargs in self._buffered_calls: getattr(self.decorated, method)(*args, **kwargs) self.decorated.stopTest(test) self._current_test = None self._current_test_filtered = None self._buffered_calls = [] + TagsMixin.stopTest(self, test) + + def tags(self, new_tags, gone_tags): + TagsMixin.tags(self, new_tags, gone_tags) + if self._current_test is not None: + self._buffered_calls.append(('tags', [new_tags, gone_tags], {})) + else: + return super(_PredicateFilter, self).tags(new_tags, gone_tags) def time(self, a_time): - return self.decorated.time(a_time) + if self._current_test is not None: + self._buffered_calls.append(('time', [a_time], {})) + else: + return self.decorated.time(a_time) def id_to_orig_id(self, id): if id.startswith("subunit.RemotedTestCase."): @@ -447,6 +431,95 @@ class TestResultFilter(TestResultDecorator): return id +class TestResultFilter(TestResultDecorator): + """A pyunit TestResult interface implementation which filters tests. + + Tests that pass the filter are handed on to another TestResult instance + for further processing/reporting. To obtain the filtered results, + the other instance must be interrogated. + + :ivar result: The result that tests are passed to after filtering. + :ivar filter_predicate: The callback run to decide whether to pass + a result. + """ + + def __init__(self, result, filter_error=False, filter_failure=False, + filter_success=True, filter_skip=False, filter_xfail=False, + filter_predicate=None, fixup_expected_failures=None): + """Create a FilterResult object filtering to result. + + :param filter_error: Filter out errors. + :param filter_failure: Filter out failures. + :param filter_success: Filter out successful tests. + :param filter_skip: Filter out skipped tests. + :param filter_xfail: Filter out expected failure tests. + :param filter_predicate: A callable taking (test, outcome, err, + details, tags) and returning True if the result should be passed + through. err and details may be none if no error or extra + metadata is available. outcome is the name of the outcome such + as 'success' or 'failure'. tags is new in 0.0.8; 0.0.7 filters + are still supported but should be updated to accept the tags + parameter for efficiency. + :param fixup_expected_failures: Set of test ids to consider known + failing. + """ + predicates = [] + if filter_error: + predicates.append( + lambda t, outcome, e, d, tags: outcome != 'error') + if filter_failure: + predicates.append( + lambda t, outcome, e, d, tags: outcome != 'failure') + if filter_success: + predicates.append( + lambda t, outcome, e, d, tags: outcome != 'success') + if filter_skip: + predicates.append( + lambda t, outcome, e, d, tags: outcome != 'skip') + if filter_xfail: + predicates.append( + lambda t, outcome, e, d, tags: outcome != 'expectedfailure') + if filter_predicate is not None: + def compat(test, outcome, error, details, tags): + # 0.0.7 and earlier did not support the 'tags' parameter. + try: + return filter_predicate( + test, outcome, error, details, tags) + except TypeError: + return filter_predicate(test, outcome, error, details) + predicates.append(compat) + predicate = and_predicates(predicates) + super(TestResultFilter, self).__init__( + _PredicateFilter(result, predicate)) + if fixup_expected_failures is None: + self._fixup_expected_failures = frozenset() + else: + self._fixup_expected_failures = fixup_expected_failures + + def addError(self, test, err=None, details=None): + if self._failure_expected(test): + self.addExpectedFailure(test, err=err, details=details) + else: + super(TestResultFilter, self).addError( + test, err=err, details=details) + + def addFailure(self, test, err=None, details=None): + if self._failure_expected(test): + self.addExpectedFailure(test, err=err, details=details) + else: + super(TestResultFilter, self).addFailure( + test, err=err, details=details) + + def addSuccess(self, test, details=None): + if self._failure_expected(test): + self.addUnexpectedSuccess(test, details=details) + else: + super(TestResultFilter, self).addSuccess(test, details=details) + + def _failure_expected(self, test): + return (test.id() in self._fixup_expected_failures) + + class TestIdPrintingResult(testtools.TestResult): def __init__(self, stream, show_times=False): @@ -510,7 +583,8 @@ class TestIdPrintingResult(testtools.TestResult): class TestByTestResult(testtools.TestResult): """Call something every time a test completes.""" - # XXX: Arguably belongs in testtools. +# XXX: In testtools since lp:testtools r249. Once that's released, just +# import that. def __init__(self, on_test): """Construct a ``TestByTestResult``. diff --git a/python/subunit/tests/test_subunit_filter.py b/python/subunit/tests/test_subunit_filter.py index 123be08..664fcb3 100644 --- a/python/subunit/tests/test_subunit_filter.py +++ b/python/subunit/tests/test_subunit_filter.py @@ -17,15 +17,18 @@ """Tests for subunit.TestResultFilter.""" from datetime import datetime +import os +import subprocess +import sys from subunit import iso8601 import unittest from testtools import TestCase -from testtools.compat import _b, BytesIO, StringIO +from testtools.compat import _b, BytesIO from testtools.testresult.doubles import ExtendedTestResult import subunit -from subunit.test_results import TestResultFilter +from subunit.test_results import _make_tag_filter, TestResultFilter class TestTestResultFilter(TestCase): @@ -77,6 +80,40 @@ xfail todo filtered_result.failures]) self.assertEqual(4, filtered_result.testsRun) + def test_tag_filter(self): + tag_filter = _make_tag_filter(['global'], ['local']) + result = ExtendedTestResult() + result_filter = TestResultFilter( + result, filter_success=False, filter_predicate=tag_filter) + self.run_tests(result_filter) + tests_included = [ + event[1] for event in result._events if event[0] == 'startTest'] + tests_expected = map( + subunit.RemotedTestCase, + ['passed', 'error', 'skipped', 'todo']) + self.assertEquals(tests_expected, tests_included) + + def test_tags_tracked_correctly(self): + tag_filter = _make_tag_filter(['a'], []) + result = ExtendedTestResult() + result_filter = TestResultFilter( + result, filter_success=False, filter_predicate=tag_filter) + input_stream = ( + "test: foo\n" + "tags: a\n" + "successful: foo\n" + "test: bar\n" + "successful: bar\n") + self.run_tests(result_filter, input_stream) + foo = subunit.RemotedTestCase('foo') + self.assertEquals( + [('startTest', foo), + ('tags', set(['a']), set()), + ('addSuccess', foo), + ('stopTest', foo), + ], + result._events) + def test_exclude_errors(self): filtered_result = unittest.TestResult() result_filter = TestResultFilter(filtered_result, filter_error=True) @@ -151,6 +188,8 @@ xfail todo def test_filter_predicate(self): """You can filter by predicate callbacks""" + # 0.0.7 and earlier did not support the 'tags' parameter, so we need + # to test that we still support behaviour without it. filtered_result = unittest.TestResult() def filter_cb(test, outcome, err, details): return outcome == 'success' @@ -161,6 +200,18 @@ xfail todo # Only success should pass self.assertEqual(1, filtered_result.testsRun) + def test_filter_predicate_with_tags(self): + """You can filter by predicate callbacks that accept tags""" + filtered_result = unittest.TestResult() + def filter_cb(test, outcome, err, details, tags): + return outcome == 'success' + result_filter = TestResultFilter(filtered_result, + filter_predicate=filter_cb, + filter_success=False) + self.run_tests(result_filter) + # Only success should pass + self.assertEqual(1, filtered_result.testsRun) + def test_time_ordering_preserved(self): # Passing a subunit stream through TestResultFilter preserves the # relative ordering of 'time' directives and any other subunit @@ -182,8 +233,8 @@ xfail todo self.maxDiff = None self.assertSequenceEqual( [('time', date_a), - ('time', date_b), ('startTest', foo), + ('time', date_b), ('addError', foo, {}), ('stopTest', foo), ('time', date_c)], result._events) @@ -203,6 +254,86 @@ xfail todo ('stopTest', foo), ], result._events) +class TestFilterCommand(TestCase): + + example_subunit_stream = _b("""\ +tags: global +test passed +success passed +test failed +tags: local +failure failed +test error +error error [ +error details +] +test skipped +skip skipped +test todo +xfail todo +""") + + def run_command(self, args, stream): + root = os.path.dirname( + os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) + script_path = os.path.join(root, 'filters', 'subunit-filter') + command = [sys.executable, script_path] + list(args) + ps = subprocess.Popen( + command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + out, err = ps.communicate(stream) + if ps.returncode != 0: + raise RuntimeError("%s failed: %s" % (command, err)) + return out + + def to_events(self, stream): + test = subunit.ProtocolTestCase(BytesIO(stream)) + result = ExtendedTestResult() + test.run(result) + return result._events + + def test_default(self): + output = self.run_command([], ( + "test: foo\n" + "skip: foo\n" + )) + events = self.to_events(output) + foo = subunit.RemotedTestCase('foo') + self.assertEqual( + [('startTest', foo), + ('addSkip', foo, {}), + ('stopTest', foo)], + events) + + def test_tags(self): + output = self.run_command(['-s', '--with-tag', 'a'], ( + "tags: a\n" + "test: foo\n" + "success: foo\n" + "tags: -a\n" + "test: bar\n" + "success: bar\n" + "test: baz\n" + "tags: a\n" + "success: baz\n" + )) + events = self.to_events(output) + foo = subunit.RemotedTestCase('foo') + baz = subunit.RemotedTestCase('baz') + self.assertEqual( + [('tags', set(['a']), set()), + ('startTest', foo), + ('addSuccess', foo), + ('stopTest', foo), + ('tags', set(), set(['a'])), + ('startTest', baz), + ('tags', set(['a']), set()), + ('addSuccess', baz), + ('stopTest', baz), + ], + events) + + def test_suite(): loader = subunit.tests.TestUtil.TestLoader() result = loader.loadTestsFromName(__name__) |
