summaryrefslogtreecommitdiff
path: root/python/subunit
diff options
context:
space:
mode:
authorRobert Collins <robertc@robertcollins.net>2012-05-02 21:45:14 +1200
committerRobert Collins <robertc@robertcollins.net>2012-05-02 21:45:14 +1200
commit33b73acea72a1799f42f3fabf2034abf2d114964 (patch)
tree18e77cdde71a65a21e54390f56c1f057f7b25f92 /python/subunit
parentc230e78f9e3f9a464c011b85779700452c647dfb (diff)
parentf42ef2fb24def4c19433e18db492416983c56e32 (diff)
downloadsubunit-git-33b73acea72a1799f42f3fabf2034abf2d114964.tar.gz
* Tags can now be filtered. (Jonathan Lange, #664171)
Diffstat (limited to 'python/subunit')
-rw-r--r--python/subunit/test_results.py296
-rw-r--r--python/subunit/tests/test_subunit_filter.py137
2 files changed, 319 insertions, 114 deletions
diff --git a/python/subunit/test_results.py b/python/subunit/test_results.py
index b800be9..bb67df0 100644
--- a/python/subunit/test_results.py
+++ b/python/subunit/test_results.py
@@ -20,6 +20,7 @@ import csv
import datetime
import testtools
+from testtools.compat import all
from testtools.content import (
text_content,
TracebackContent,
@@ -39,6 +40,9 @@ class TestResultDecorator(object):
or features by degrading them.
"""
+ # XXX: Since lp:testtools r250, this is in testtools. Once it's released,
+ # we should gut this and just use that.
+
def __init__(self, decorated):
"""Create a TestResultDecorator forwarding to decorated."""
# Make every decorator degrade gracefully.
@@ -205,47 +209,44 @@ class AutoTimingTestResultDecorator(HookedTestResultDecorator):
return self.decorated.time(a_datetime)
-class TagCollapsingDecorator(HookedTestResultDecorator):
- """Collapses many 'tags' calls into one where possible."""
+class TagsMixin(object):
- def __init__(self, result):
- super(TagCollapsingDecorator, self).__init__(result)
+ def __init__(self):
self._clear_tags()
def _clear_tags(self):
self._global_tags = set(), set()
- self._current_test_tags = None
-
- def _get_current_tags(self):
- if self._current_test_tags:
- return self._current_test_tags
+ self._test_tags = None
+
+ def _get_active_tags(self):
+ global_new, global_gone = self._global_tags
+ if self._test_tags is None:
+ return set(global_new)
+ test_new, test_gone = self._test_tags
+ return global_new.difference(test_gone).union(test_new)
+
+ def _get_current_scope(self):
+ if self._test_tags:
+ return self._test_tags
return self._global_tags
+ def _flush_current_scope(self, tag_receiver):
+ new_tags, gone_tags = self._get_current_scope()
+ if new_tags or gone_tags:
+ tag_receiver.tags(new_tags, gone_tags)
+ if self._test_tags:
+ self._test_tags = set(), set()
+ else:
+ self._global_tags = set(), set()
+
def startTestRun(self):
- super(TagCollapsingDecorator, self).startTestRun()
self._clear_tags()
def startTest(self, test):
- """Start a test.
-
- Not directly passed to the client, but used for handling of tags
- correctly.
- """
- super(TagCollapsingDecorator, self).startTest(test)
- self._current_test_tags = set(), set()
+ self._test_tags = set(), set()
def stopTest(self, test):
- super(TagCollapsingDecorator, self).stopTest(test)
- self._current_test_tags = None
-
- def _before_event(self):
- new_tags, gone_tags = self._get_current_tags()
- if new_tags or gone_tags:
- self.decorated.tags(new_tags, gone_tags)
- if self._current_test_tags:
- self._current_test_tags = set(), set()
- else:
- self._global_tags = set(), set()
+ self._test_tags = None
def tags(self, new_tags, gone_tags):
"""Handle tag instructions.
@@ -256,13 +257,27 @@ class TagCollapsingDecorator(HookedTestResultDecorator):
:param new_tags: Tags to add,
:param gone_tags: Tags to remove.
"""
- current_new_tags, current_gone_tags = self._get_current_tags()
+ current_new_tags, current_gone_tags = self._get_current_scope()
current_new_tags.update(new_tags)
current_new_tags.difference_update(gone_tags)
current_gone_tags.update(gone_tags)
current_gone_tags.difference_update(new_tags)
+class TagCollapsingDecorator(HookedTestResultDecorator, TagsMixin):
+ """Collapses many 'tags' calls into one where possible."""
+
+ def __init__(self, result):
+ super(TagCollapsingDecorator, self).__init__(result)
+ self._clear_tags()
+
+ def _before_event(self):
+ self._flush_current_scope(self.decorated)
+
+ def tags(self, new_tags, gone_tags):
+ TagsMixin.tags(self, new_tags, gone_tags)
+
+
class TimeCollapsingDecorator(HookedTestResultDecorator):
"""Only pass on the first and last of a consecutive sequence of times."""
@@ -288,93 +303,58 @@ class TimeCollapsingDecorator(HookedTestResultDecorator):
self._last_received_time = a_time
-def all_true(bools):
- """Return True if all of 'bools' are True. False otherwise."""
- for b in bools:
- if not b:
- return False
- return True
+def and_predicates(predicates):
+ """Return a predicate that is true iff all predicates are true."""
+ # XXX: Should probably be in testtools to be better used by matchers. jml
+ return lambda *args, **kwargs: all(p(*args, **kwargs) for p in predicates)
-class TestResultFilter(TestResultDecorator):
- """A pyunit TestResult interface implementation which filters tests.
+def _make_tag_filter(with_tags, without_tags):
+ """Make a callback that checks tests against tags."""
- Tests that pass the filter are handed on to another TestResult instance
- for further processing/reporting. To obtain the filtered results,
- the other instance must be interrogated.
+ with_tags = with_tags and set(with_tags) or None
+ without_tags = without_tags and set(without_tags) or None
- :ivar result: The result that tests are passed to after filtering.
- :ivar filter_predicate: The callback run to decide whether to pass
- a result.
- """
+ def check_tags(test, outcome, err, details, tags):
+ if with_tags and not with_tags <= tags:
+ return False
+ if without_tags and bool(without_tags & tags):
+ return False
+ return True
- def __init__(self, result, filter_error=False, filter_failure=False,
- filter_success=True, filter_skip=False, filter_xfail=False,
- filter_predicate=None, fixup_expected_failures=None):
- """Create a FilterResult object filtering to result.
+ return check_tags
- :param filter_error: Filter out errors.
- :param filter_failure: Filter out failures.
- :param filter_success: Filter out successful tests.
- :param filter_skip: Filter out skipped tests.
- :param filter_xfail: Filter out expected failure tests.
- :param filter_predicate: A callable taking (test, outcome, err,
- details) and returning True if the result should be passed
- through. err and details may be none if no error or extra
- metadata is available. outcome is the name of the outcome such
- as 'success' or 'failure'.
- :param fixup_expected_failures: Set of test ids to consider known
- failing.
- """
- super(TestResultFilter, self).__init__(result)
+
+class _PredicateFilter(TestResultDecorator, TagsMixin):
+
+ def __init__(self, result, predicate):
+ super(_PredicateFilter, self).__init__(result)
+ self._clear_tags()
self.decorated = TimeCollapsingDecorator(
TagCollapsingDecorator(self.decorated))
- predicates = []
- if filter_error:
- predicates.append(lambda t, outcome, e, d: outcome != 'error')
- if filter_failure:
- predicates.append(lambda t, outcome, e, d: outcome != 'failure')
- if filter_success:
- predicates.append(lambda t, outcome, e, d: outcome != 'success')
- if filter_skip:
- predicates.append(lambda t, outcome, e, d: outcome != 'skip')
- if filter_xfail:
- predicates.append(lambda t, outcome, e, d: outcome != 'expectedfailure')
- if filter_predicate is not None:
- predicates.append(filter_predicate)
- self.filter_predicate = (
- lambda test, outcome, err, details:
- all_true(p(test, outcome, err, details) for p in predicates))
+ self._predicate = predicate
# The current test (for filtering tags)
self._current_test = None
# Has the current test been filtered (for outputting test tags)
self._current_test_filtered = None
# Calls to this result that we don't know whether to forward on yet.
self._buffered_calls = []
- if fixup_expected_failures is None:
- self._fixup_expected_failures = frozenset()
- else:
- self._fixup_expected_failures = fixup_expected_failures
+
+ def filter_predicate(self, test, outcome, error, details):
+ return self._predicate(
+ test, outcome, error, details, self._get_active_tags())
def addError(self, test, err=None, details=None):
if (self.filter_predicate(test, 'error', err, details)):
- if self._failure_expected(test):
- self._buffered_calls.append(
- ('addExpectedFailure', [test, err], {'details': details}))
- else:
- self._buffered_calls.append(
- ('addError', [test, err], {'details': details}))
+ self._buffered_calls.append(
+ ('addError', [test, err], {'details': details}))
else:
self._filtered()
def addFailure(self, test, err=None, details=None):
if (self.filter_predicate(test, 'failure', err, details)):
- if self._failure_expected(test):
- self._buffered_calls.append(
- ('addExpectedFailure', [test, err], {'details': details}))
- else:
- self._buffered_calls.append(
- ('addFailure', [test, err], {'details': details}))
+ self._buffered_calls.append(
+ ('addFailure', [test, err], {'details': details}))
else:
self._filtered()
@@ -385,17 +365,6 @@ class TestResultFilter(TestResultDecorator):
else:
self._filtered()
- def addSuccess(self, test, details=None):
- if (self.filter_predicate(test, 'success', None, details)):
- if self._failure_expected(test):
- self._buffered_calls.append(
- ('addUnexpectedSuccess', [test], {'details': details}))
- else:
- self._buffered_calls.append(
- ('addSuccess', [test], {'details': details}))
- else:
- self._filtered()
-
def addExpectedFailure(self, test, err=None, details=None):
if self.filter_predicate(test, 'expectedfailure', err, details):
self._buffered_calls.append(
@@ -407,18 +376,23 @@ class TestResultFilter(TestResultDecorator):
self._buffered_calls.append(
('addUnexpectedSuccess', [test], {'details': details}))
+ def addSuccess(self, test, details=None):
+ if (self.filter_predicate(test, 'success', None, details)):
+ self._buffered_calls.append(
+ ('addSuccess', [test], {'details': details}))
+ else:
+ self._filtered()
+
def _filtered(self):
self._current_test_filtered = True
- def _failure_expected(self, test):
- return (test.id() in self._fixup_expected_failures)
-
def startTest(self, test):
"""Start a test.
Not directly passed to the client, but used for handling of tags
correctly.
"""
+ TagsMixin.startTest(self, test)
self._current_test = test
self._current_test_filtered = False
self._buffered_calls.append(('startTest', [test], {}))
@@ -430,16 +404,26 @@ class TestResultFilter(TestResultDecorator):
correctly.
"""
if not self._current_test_filtered:
- # Tags to output for this test.
for method, args, kwargs in self._buffered_calls:
getattr(self.decorated, method)(*args, **kwargs)
self.decorated.stopTest(test)
self._current_test = None
self._current_test_filtered = None
self._buffered_calls = []
+ TagsMixin.stopTest(self, test)
+
+ def tags(self, new_tags, gone_tags):
+ TagsMixin.tags(self, new_tags, gone_tags)
+ if self._current_test is not None:
+ self._buffered_calls.append(('tags', [new_tags, gone_tags], {}))
+ else:
+ return super(_PredicateFilter, self).tags(new_tags, gone_tags)
def time(self, a_time):
- return self.decorated.time(a_time)
+ if self._current_test is not None:
+ self._buffered_calls.append(('time', [a_time], {}))
+ else:
+ return self.decorated.time(a_time)
def id_to_orig_id(self, id):
if id.startswith("subunit.RemotedTestCase."):
@@ -447,6 +431,95 @@ class TestResultFilter(TestResultDecorator):
return id
+class TestResultFilter(TestResultDecorator):
+ """A pyunit TestResult interface implementation which filters tests.
+
+ Tests that pass the filter are handed on to another TestResult instance
+ for further processing/reporting. To obtain the filtered results,
+ the other instance must be interrogated.
+
+ :ivar result: The result that tests are passed to after filtering.
+ :ivar filter_predicate: The callback run to decide whether to pass
+ a result.
+ """
+
+ def __init__(self, result, filter_error=False, filter_failure=False,
+ filter_success=True, filter_skip=False, filter_xfail=False,
+ filter_predicate=None, fixup_expected_failures=None):
+ """Create a FilterResult object filtering to result.
+
+ :param filter_error: Filter out errors.
+ :param filter_failure: Filter out failures.
+ :param filter_success: Filter out successful tests.
+ :param filter_skip: Filter out skipped tests.
+ :param filter_xfail: Filter out expected failure tests.
+ :param filter_predicate: A callable taking (test, outcome, err,
+ details, tags) and returning True if the result should be passed
+ through. err and details may be none if no error or extra
+ metadata is available. outcome is the name of the outcome such
+ as 'success' or 'failure'. tags is new in 0.0.8; 0.0.7 filters
+ are still supported but should be updated to accept the tags
+ parameter for efficiency.
+ :param fixup_expected_failures: Set of test ids to consider known
+ failing.
+ """
+ predicates = []
+ if filter_error:
+ predicates.append(
+ lambda t, outcome, e, d, tags: outcome != 'error')
+ if filter_failure:
+ predicates.append(
+ lambda t, outcome, e, d, tags: outcome != 'failure')
+ if filter_success:
+ predicates.append(
+ lambda t, outcome, e, d, tags: outcome != 'success')
+ if filter_skip:
+ predicates.append(
+ lambda t, outcome, e, d, tags: outcome != 'skip')
+ if filter_xfail:
+ predicates.append(
+ lambda t, outcome, e, d, tags: outcome != 'expectedfailure')
+ if filter_predicate is not None:
+ def compat(test, outcome, error, details, tags):
+ # 0.0.7 and earlier did not support the 'tags' parameter.
+ try:
+ return filter_predicate(
+ test, outcome, error, details, tags)
+ except TypeError:
+ return filter_predicate(test, outcome, error, details)
+ predicates.append(compat)
+ predicate = and_predicates(predicates)
+ super(TestResultFilter, self).__init__(
+ _PredicateFilter(result, predicate))
+ if fixup_expected_failures is None:
+ self._fixup_expected_failures = frozenset()
+ else:
+ self._fixup_expected_failures = fixup_expected_failures
+
+ def addError(self, test, err=None, details=None):
+ if self._failure_expected(test):
+ self.addExpectedFailure(test, err=err, details=details)
+ else:
+ super(TestResultFilter, self).addError(
+ test, err=err, details=details)
+
+ def addFailure(self, test, err=None, details=None):
+ if self._failure_expected(test):
+ self.addExpectedFailure(test, err=err, details=details)
+ else:
+ super(TestResultFilter, self).addFailure(
+ test, err=err, details=details)
+
+ def addSuccess(self, test, details=None):
+ if self._failure_expected(test):
+ self.addUnexpectedSuccess(test, details=details)
+ else:
+ super(TestResultFilter, self).addSuccess(test, details=details)
+
+ def _failure_expected(self, test):
+ return (test.id() in self._fixup_expected_failures)
+
+
class TestIdPrintingResult(testtools.TestResult):
def __init__(self, stream, show_times=False):
@@ -510,7 +583,8 @@ class TestIdPrintingResult(testtools.TestResult):
class TestByTestResult(testtools.TestResult):
"""Call something every time a test completes."""
- # XXX: Arguably belongs in testtools.
+# XXX: In testtools since lp:testtools r249. Once that's released, just
+# import that.
def __init__(self, on_test):
"""Construct a ``TestByTestResult``.
diff --git a/python/subunit/tests/test_subunit_filter.py b/python/subunit/tests/test_subunit_filter.py
index 123be08..664fcb3 100644
--- a/python/subunit/tests/test_subunit_filter.py
+++ b/python/subunit/tests/test_subunit_filter.py
@@ -17,15 +17,18 @@
"""Tests for subunit.TestResultFilter."""
from datetime import datetime
+import os
+import subprocess
+import sys
from subunit import iso8601
import unittest
from testtools import TestCase
-from testtools.compat import _b, BytesIO, StringIO
+from testtools.compat import _b, BytesIO
from testtools.testresult.doubles import ExtendedTestResult
import subunit
-from subunit.test_results import TestResultFilter
+from subunit.test_results import _make_tag_filter, TestResultFilter
class TestTestResultFilter(TestCase):
@@ -77,6 +80,40 @@ xfail todo
filtered_result.failures])
self.assertEqual(4, filtered_result.testsRun)
+ def test_tag_filter(self):
+ tag_filter = _make_tag_filter(['global'], ['local'])
+ result = ExtendedTestResult()
+ result_filter = TestResultFilter(
+ result, filter_success=False, filter_predicate=tag_filter)
+ self.run_tests(result_filter)
+ tests_included = [
+ event[1] for event in result._events if event[0] == 'startTest']
+ tests_expected = map(
+ subunit.RemotedTestCase,
+ ['passed', 'error', 'skipped', 'todo'])
+ self.assertEquals(tests_expected, tests_included)
+
+ def test_tags_tracked_correctly(self):
+ tag_filter = _make_tag_filter(['a'], [])
+ result = ExtendedTestResult()
+ result_filter = TestResultFilter(
+ result, filter_success=False, filter_predicate=tag_filter)
+ input_stream = (
+ "test: foo\n"
+ "tags: a\n"
+ "successful: foo\n"
+ "test: bar\n"
+ "successful: bar\n")
+ self.run_tests(result_filter, input_stream)
+ foo = subunit.RemotedTestCase('foo')
+ self.assertEquals(
+ [('startTest', foo),
+ ('tags', set(['a']), set()),
+ ('addSuccess', foo),
+ ('stopTest', foo),
+ ],
+ result._events)
+
def test_exclude_errors(self):
filtered_result = unittest.TestResult()
result_filter = TestResultFilter(filtered_result, filter_error=True)
@@ -151,6 +188,8 @@ xfail todo
def test_filter_predicate(self):
"""You can filter by predicate callbacks"""
+ # 0.0.7 and earlier did not support the 'tags' parameter, so we need
+ # to test that we still support behaviour without it.
filtered_result = unittest.TestResult()
def filter_cb(test, outcome, err, details):
return outcome == 'success'
@@ -161,6 +200,18 @@ xfail todo
# Only success should pass
self.assertEqual(1, filtered_result.testsRun)
+ def test_filter_predicate_with_tags(self):
+ """You can filter by predicate callbacks that accept tags"""
+ filtered_result = unittest.TestResult()
+ def filter_cb(test, outcome, err, details, tags):
+ return outcome == 'success'
+ result_filter = TestResultFilter(filtered_result,
+ filter_predicate=filter_cb,
+ filter_success=False)
+ self.run_tests(result_filter)
+ # Only success should pass
+ self.assertEqual(1, filtered_result.testsRun)
+
def test_time_ordering_preserved(self):
# Passing a subunit stream through TestResultFilter preserves the
# relative ordering of 'time' directives and any other subunit
@@ -182,8 +233,8 @@ xfail todo
self.maxDiff = None
self.assertSequenceEqual(
[('time', date_a),
- ('time', date_b),
('startTest', foo),
+ ('time', date_b),
('addError', foo, {}),
('stopTest', foo),
('time', date_c)], result._events)
@@ -203,6 +254,86 @@ xfail todo
('stopTest', foo), ], result._events)
+class TestFilterCommand(TestCase):
+
+ example_subunit_stream = _b("""\
+tags: global
+test passed
+success passed
+test failed
+tags: local
+failure failed
+test error
+error error [
+error details
+]
+test skipped
+skip skipped
+test todo
+xfail todo
+""")
+
+ def run_command(self, args, stream):
+ root = os.path.dirname(
+ os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
+ script_path = os.path.join(root, 'filters', 'subunit-filter')
+ command = [sys.executable, script_path] + list(args)
+ ps = subprocess.Popen(
+ command, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ out, err = ps.communicate(stream)
+ if ps.returncode != 0:
+ raise RuntimeError("%s failed: %s" % (command, err))
+ return out
+
+ def to_events(self, stream):
+ test = subunit.ProtocolTestCase(BytesIO(stream))
+ result = ExtendedTestResult()
+ test.run(result)
+ return result._events
+
+ def test_default(self):
+ output = self.run_command([], (
+ "test: foo\n"
+ "skip: foo\n"
+ ))
+ events = self.to_events(output)
+ foo = subunit.RemotedTestCase('foo')
+ self.assertEqual(
+ [('startTest', foo),
+ ('addSkip', foo, {}),
+ ('stopTest', foo)],
+ events)
+
+ def test_tags(self):
+ output = self.run_command(['-s', '--with-tag', 'a'], (
+ "tags: a\n"
+ "test: foo\n"
+ "success: foo\n"
+ "tags: -a\n"
+ "test: bar\n"
+ "success: bar\n"
+ "test: baz\n"
+ "tags: a\n"
+ "success: baz\n"
+ ))
+ events = self.to_events(output)
+ foo = subunit.RemotedTestCase('foo')
+ baz = subunit.RemotedTestCase('baz')
+ self.assertEqual(
+ [('tags', set(['a']), set()),
+ ('startTest', foo),
+ ('addSuccess', foo),
+ ('stopTest', foo),
+ ('tags', set(), set(['a'])),
+ ('startTest', baz),
+ ('tags', set(['a']), set()),
+ ('addSuccess', baz),
+ ('stopTest', baz),
+ ],
+ events)
+
+
def test_suite():
loader = subunit.tests.TestUtil.TestLoader()
result = loader.loadTestsFromName(__name__)