summaryrefslogtreecommitdiff
path: root/python/subunit/tests
diff options
context:
space:
mode:
authorJonathan Lange <jml@canonical.com>2012-02-01 18:46:25 +0000
committerJonathan Lange <jml@canonical.com>2012-02-01 18:46:25 +0000
commitc4924ff01abc2805a137c27de7ea18747abdcd04 (patch)
treeebacc12013ef6084014ebbddbb73f86a5c00f24c /python/subunit/tests
parent6033d5b90991de13604f7de3ed6e42581b01d2c2 (diff)
parent29021cef8d10e194c1229b73c5388502b8c8aa74 (diff)
downloadsubunit-git-c4924ff01abc2805a137c27de7ea18747abdcd04.tar.gz
Merge trunk
Diffstat (limited to 'python/subunit/tests')
-rw-r--r--python/subunit/tests/TestUtil.py2
-rw-r--r--python/subunit/tests/__init__.py2
-rwxr-xr-xpython/subunit/tests/sample-script.py3
-rw-r--r--python/subunit/tests/test_chunked.py109
-rw-r--r--python/subunit/tests/test_details.py45
-rw-r--r--python/subunit/tests/test_run.py52
-rw-r--r--python/subunit/tests/test_subunit_filter.py218
-rw-r--r--python/subunit/tests/test_subunit_stats.py9
-rw-r--r--python/subunit/tests/test_subunit_tags.py3
-rw-r--r--python/subunit/tests/test_tap2subunit.py4
-rw-r--r--python/subunit/tests/test_test_protocol.py584
-rw-r--r--python/subunit/tests/test_test_results.py121
12 files changed, 781 insertions, 371 deletions
diff --git a/python/subunit/tests/TestUtil.py b/python/subunit/tests/TestUtil.py
index 1b5ba9c..39d901e 100644
--- a/python/subunit/tests/TestUtil.py
+++ b/python/subunit/tests/TestUtil.py
@@ -53,7 +53,7 @@ def visitTests(suite, visitor):
visitor.visitSuite(test)
visitTests(test, visitor)
else:
- print "unvisitable non-unittest.TestCase element %r (%r)" % (test, test.__class__)
+ print ("unvisitable non-unittest.TestCase element %r (%r)" % (test, test.__class__))
class TestSuite(unittest.TestSuite):
diff --git a/python/subunit/tests/__init__.py b/python/subunit/tests/__init__.py
index a78cec8..e0e1eb1 100644
--- a/python/subunit/tests/__init__.py
+++ b/python/subunit/tests/__init__.py
@@ -19,6 +19,7 @@ from subunit.tests import (
test_chunked,
test_details,
test_progress_model,
+ test_run,
test_subunit_filter,
test_subunit_stats,
test_subunit_tags,
@@ -38,4 +39,5 @@ def test_suite():
result.addTest(test_subunit_filter.test_suite())
result.addTest(test_subunit_tags.test_suite())
result.addTest(test_subunit_stats.test_suite())
+ result.addTest(test_run.test_suite())
return result
diff --git a/python/subunit/tests/sample-script.py b/python/subunit/tests/sample-script.py
index 0ee019a..618e495 100755
--- a/python/subunit/tests/sample-script.py
+++ b/python/subunit/tests/sample-script.py
@@ -1,5 +1,8 @@
#!/usr/bin/env python
import sys
+if sys.platform == "win32":
+ import msvcrt, os
+ msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
if len(sys.argv) == 2:
# subunit.tests.test_test_protocol.TestExecTestCase.test_sample_method_args
# uses this code path to be sure that the arguments were passed to
diff --git a/python/subunit/tests/test_chunked.py b/python/subunit/tests/test_chunked.py
index a24e31e..e0742f1 100644
--- a/python/subunit/tests/test_chunked.py
+++ b/python/subunit/tests/test_chunked.py
@@ -1,6 +1,7 @@
#
# subunit: extensions to python unittest to get test results from subprocesses.
# Copyright (C) 2005 Robert Collins <robertc@robertcollins.net>
+# Copyright (C) 2011 Martin Pool <mbp@sourcefrog.net>
#
# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
# license at the users choice. A copy of both licenses are available in the
@@ -14,9 +15,10 @@
# limitations under that license.
#
-from cStringIO import StringIO
import unittest
+from testtools.compat import _b, BytesIO
+
import subunit.chunked
@@ -30,98 +32,121 @@ class TestDecode(unittest.TestCase):
def setUp(self):
unittest.TestCase.setUp(self)
- self.output = StringIO()
+ self.output = BytesIO()
self.decoder = subunit.chunked.Decoder(self.output)
def test_close_read_length_short_errors(self):
self.assertRaises(ValueError, self.decoder.close)
def test_close_body_short_errors(self):
- self.assertEqual(None, self.decoder.write('2\r\na'))
+ self.assertEqual(None, self.decoder.write(_b('2\r\na')))
self.assertRaises(ValueError, self.decoder.close)
def test_close_body_buffered_data_errors(self):
- self.assertEqual(None, self.decoder.write('2\r'))
+ self.assertEqual(None, self.decoder.write(_b('2\r')))
self.assertRaises(ValueError, self.decoder.close)
def test_close_after_finished_stream_safe(self):
- self.assertEqual(None, self.decoder.write('2\r\nab'))
- self.assertEqual('', self.decoder.write('0\r\n'))
+ self.assertEqual(None, self.decoder.write(_b('2\r\nab')))
+ self.assertEqual(_b(''), self.decoder.write(_b('0\r\n')))
self.decoder.close()
def test_decode_nothing(self):
- self.assertEqual('', self.decoder.write('0\r\n'))
- self.assertEqual('', self.output.getvalue())
+ self.assertEqual(_b(''), self.decoder.write(_b('0\r\n')))
+ self.assertEqual(_b(''), self.output.getvalue())
def test_decode_serialised_form(self):
- self.assertEqual(None, self.decoder.write("F\r\n"))
- self.assertEqual(None, self.decoder.write("serialised\n"))
- self.assertEqual('', self.decoder.write("form0\r\n"))
+ self.assertEqual(None, self.decoder.write(_b("F\r\n")))
+ self.assertEqual(None, self.decoder.write(_b("serialised\n")))
+ self.assertEqual(_b(''), self.decoder.write(_b("form0\r\n")))
def test_decode_short(self):
- self.assertEqual('', self.decoder.write('3\r\nabc0\r\n'))
- self.assertEqual('abc', self.output.getvalue())
+ self.assertEqual(_b(''), self.decoder.write(_b('3\r\nabc0\r\n')))
+ self.assertEqual(_b('abc'), self.output.getvalue())
def test_decode_combines_short(self):
- self.assertEqual('', self.decoder.write('6\r\nabcdef0\r\n'))
- self.assertEqual('abcdef', self.output.getvalue())
+ self.assertEqual(_b(''), self.decoder.write(_b('6\r\nabcdef0\r\n')))
+ self.assertEqual(_b('abcdef'), self.output.getvalue())
def test_decode_excess_bytes_from_write(self):
- self.assertEqual('1234', self.decoder.write('3\r\nabc0\r\n1234'))
- self.assertEqual('abc', self.output.getvalue())
+ self.assertEqual(_b('1234'), self.decoder.write(_b('3\r\nabc0\r\n1234')))
+ self.assertEqual(_b('abc'), self.output.getvalue())
def test_decode_write_after_finished_errors(self):
- self.assertEqual('1234', self.decoder.write('3\r\nabc0\r\n1234'))
- self.assertRaises(ValueError, self.decoder.write, '')
+ self.assertEqual(_b('1234'), self.decoder.write(_b('3\r\nabc0\r\n1234')))
+ self.assertRaises(ValueError, self.decoder.write, _b(''))
def test_decode_hex(self):
- self.assertEqual('', self.decoder.write('A\r\n12345678900\r\n'))
- self.assertEqual('1234567890', self.output.getvalue())
+ self.assertEqual(_b(''), self.decoder.write(_b('A\r\n12345678900\r\n')))
+ self.assertEqual(_b('1234567890'), self.output.getvalue())
def test_decode_long_ranges(self):
- self.assertEqual(None, self.decoder.write('10000\r\n'))
- self.assertEqual(None, self.decoder.write('1' * 65536))
- self.assertEqual(None, self.decoder.write('10000\r\n'))
- self.assertEqual(None, self.decoder.write('2' * 65536))
- self.assertEqual('', self.decoder.write('0\r\n'))
- self.assertEqual('1' * 65536 + '2' * 65536, self.output.getvalue())
+ self.assertEqual(None, self.decoder.write(_b('10000\r\n')))
+ self.assertEqual(None, self.decoder.write(_b('1' * 65536)))
+ self.assertEqual(None, self.decoder.write(_b('10000\r\n')))
+ self.assertEqual(None, self.decoder.write(_b('2' * 65536)))
+ self.assertEqual(_b(''), self.decoder.write(_b('0\r\n')))
+ self.assertEqual(_b('1' * 65536 + '2' * 65536), self.output.getvalue())
+
+ def test_decode_newline_nonstrict(self):
+ """Tolerate chunk markers with no CR character."""
+ # From <http://pad.lv/505078>
+ self.decoder = subunit.chunked.Decoder(self.output, strict=False)
+ self.assertEqual(None, self.decoder.write(_b('a\n')))
+ self.assertEqual(None, self.decoder.write(_b('abcdeabcde')))
+ self.assertEqual(_b(''), self.decoder.write(_b('0\n')))
+ self.assertEqual(_b('abcdeabcde'), self.output.getvalue())
+
+ def test_decode_strict_newline_only(self):
+ """Reject chunk markers with no CR character in strict mode."""
+ # From <http://pad.lv/505078>
+ self.assertRaises(ValueError,
+ self.decoder.write, _b('a\n'))
+
+ def test_decode_strict_multiple_crs(self):
+ self.assertRaises(ValueError,
+ self.decoder.write, _b('a\r\r\n'))
+
+ def test_decode_short_header(self):
+ self.assertRaises(ValueError,
+ self.decoder.write, _b('\n'))
class TestEncode(unittest.TestCase):
def setUp(self):
unittest.TestCase.setUp(self)
- self.output = StringIO()
+ self.output = BytesIO()
self.encoder = subunit.chunked.Encoder(self.output)
def test_encode_nothing(self):
self.encoder.close()
- self.assertEqual('0\r\n', self.output.getvalue())
+ self.assertEqual(_b('0\r\n'), self.output.getvalue())
def test_encode_empty(self):
- self.encoder.write('')
+ self.encoder.write(_b(''))
self.encoder.close()
- self.assertEqual('0\r\n', self.output.getvalue())
+ self.assertEqual(_b('0\r\n'), self.output.getvalue())
def test_encode_short(self):
- self.encoder.write('abc')
+ self.encoder.write(_b('abc'))
self.encoder.close()
- self.assertEqual('3\r\nabc0\r\n', self.output.getvalue())
+ self.assertEqual(_b('3\r\nabc0\r\n'), self.output.getvalue())
def test_encode_combines_short(self):
- self.encoder.write('abc')
- self.encoder.write('def')
+ self.encoder.write(_b('abc'))
+ self.encoder.write(_b('def'))
self.encoder.close()
- self.assertEqual('6\r\nabcdef0\r\n', self.output.getvalue())
+ self.assertEqual(_b('6\r\nabcdef0\r\n'), self.output.getvalue())
def test_encode_over_9_is_in_hex(self):
- self.encoder.write('1234567890')
+ self.encoder.write(_b('1234567890'))
self.encoder.close()
- self.assertEqual('A\r\n12345678900\r\n', self.output.getvalue())
+ self.assertEqual(_b('A\r\n12345678900\r\n'), self.output.getvalue())
def test_encode_long_ranges_not_combined(self):
- self.encoder.write('1' * 65536)
- self.encoder.write('2' * 65536)
+ self.encoder.write(_b('1' * 65536))
+ self.encoder.write(_b('2' * 65536))
self.encoder.close()
- self.assertEqual('10000\r\n' + '1' * 65536 + '10000\r\n' +
- '2' * 65536 + '0\r\n', self.output.getvalue())
+ self.assertEqual(_b('10000\r\n' + '1' * 65536 + '10000\r\n' +
+ '2' * 65536 + '0\r\n'), self.output.getvalue())
diff --git a/python/subunit/tests/test_details.py b/python/subunit/tests/test_details.py
index 41c3212..746aa04 100644
--- a/python/subunit/tests/test_details.py
+++ b/python/subunit/tests/test_details.py
@@ -14,9 +14,10 @@
# limitations under that license.
#
-from cStringIO import StringIO
import unittest
+from testtools.compat import _b, StringIO
+
import subunit.tests
from subunit import content, content_type, details
@@ -31,20 +32,20 @@ class TestSimpleDetails(unittest.TestCase):
def test_lineReceived(self):
parser = details.SimpleDetailsParser(None)
- parser.lineReceived("foo\n")
- parser.lineReceived("bar\n")
- self.assertEqual("foo\nbar\n", parser._message)
+ parser.lineReceived(_b("foo\n"))
+ parser.lineReceived(_b("bar\n"))
+ self.assertEqual(_b("foo\nbar\n"), parser._message)
def test_lineReceived_escaped_bracket(self):
parser = details.SimpleDetailsParser(None)
- parser.lineReceived("foo\n")
- parser.lineReceived(" ]are\n")
- parser.lineReceived("bar\n")
- self.assertEqual("foo\n]are\nbar\n", parser._message)
+ parser.lineReceived(_b("foo\n"))
+ parser.lineReceived(_b(" ]are\n"))
+ parser.lineReceived(_b("bar\n"))
+ self.assertEqual(_b("foo\n]are\nbar\n"), parser._message)
def test_get_message(self):
parser = details.SimpleDetailsParser(None)
- self.assertEqual("", parser.get_message())
+ self.assertEqual(_b(""), parser.get_message())
def test_get_details(self):
parser = details.SimpleDetailsParser(None)
@@ -53,13 +54,13 @@ class TestSimpleDetails(unittest.TestCase):
expected['traceback'] = content.Content(
content_type.ContentType("text", "x-traceback",
{'charset': 'utf8'}),
- lambda:[""])
+ lambda:[_b("")])
found = parser.get_details()
self.assertEqual(expected.keys(), found.keys())
self.assertEqual(expected['traceback'].content_type,
found['traceback'].content_type)
- self.assertEqual(''.join(expected['traceback'].iter_bytes()),
- ''.join(found['traceback'].iter_bytes()))
+ self.assertEqual(_b('').join(expected['traceback'].iter_bytes()),
+ _b('').join(found['traceback'].iter_bytes()))
def test_get_details_skip(self):
parser = details.SimpleDetailsParser(None)
@@ -67,7 +68,7 @@ class TestSimpleDetails(unittest.TestCase):
expected = {}
expected['reason'] = content.Content(
content_type.ContentType("text", "plain"),
- lambda:[""])
+ lambda:[_b("")])
found = parser.get_details("skip")
self.assertEqual(expected, found)
@@ -77,7 +78,7 @@ class TestSimpleDetails(unittest.TestCase):
expected = {}
expected['message'] = content.Content(
content_type.ContentType("text", "plain"),
- lambda:[""])
+ lambda:[_b("")])
found = parser.get_details("success")
self.assertEqual(expected, found)
@@ -94,18 +95,18 @@ class TestMultipartDetails(unittest.TestCase):
def test_parts(self):
parser = details.MultipartDetailsParser(None)
- parser.lineReceived("Content-Type: text/plain\n")
- parser.lineReceived("something\n")
- parser.lineReceived("F\r\n")
- parser.lineReceived("serialised\n")
- parser.lineReceived("form0\r\n")
+ parser.lineReceived(_b("Content-Type: text/plain\n"))
+ parser.lineReceived(_b("something\n"))
+ parser.lineReceived(_b("F\r\n"))
+ parser.lineReceived(_b("serialised\n"))
+ parser.lineReceived(_b("form0\r\n"))
expected = {}
expected['something'] = content.Content(
content_type.ContentType("text", "plain"),
- lambda:["serialised\nform"])
+ lambda:[_b("serialised\nform")])
found = parser.get_details()
self.assertEqual(expected.keys(), found.keys())
self.assertEqual(expected['something'].content_type,
found['something'].content_type)
- self.assertEqual(''.join(expected['something'].iter_bytes()),
- ''.join(found['something'].iter_bytes()))
+ self.assertEqual(_b('').join(expected['something'].iter_bytes()),
+ _b('').join(found['something'].iter_bytes()))
diff --git a/python/subunit/tests/test_run.py b/python/subunit/tests/test_run.py
new file mode 100644
index 0000000..5a96bcf
--- /dev/null
+++ b/python/subunit/tests/test_run.py
@@ -0,0 +1,52 @@
+#
+# subunit: extensions to python unittest to get test results from subprocesses.
+# Copyright (C) 2011 Robert Collins <robertc@robertcollins.net>
+#
+# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
+# license at the users choice. A copy of both licenses are available in the
+# project source as Apache-2.0 and BSD. You may not use this file except in
+# compliance with one of these two licences.
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# license you chose for the specific language governing permissions and
+# limitations under that license.
+#
+
+from cStringIO import StringIO
+import unittest
+
+from testtools import PlaceHolder
+
+import subunit
+from subunit.run import SubunitTestRunner
+
+
+def test_suite():
+ loader = subunit.tests.TestUtil.TestLoader()
+ result = loader.loadTestsFromName(__name__)
+ return result
+
+
+class TimeCollectingTestResult(unittest.TestResult):
+
+ def __init__(self, *args, **kwargs):
+ super(TimeCollectingTestResult, self).__init__(*args, **kwargs)
+ self.time_called = []
+
+ def time(self, a_time):
+ self.time_called.append(a_time)
+
+
+class TestSubunitTestRunner(unittest.TestCase):
+
+ def test_includes_timing_output(self):
+ io = StringIO()
+ runner = SubunitTestRunner(stream=io)
+ test = PlaceHolder('name')
+ runner.run(test)
+ client = TimeCollectingTestResult()
+ io.seek(0)
+ subunit.TestProtocolServer(client).readFrom(io)
+ self.assertTrue(len(client.time_called) > 0)
diff --git a/python/subunit/tests/test_subunit_filter.py b/python/subunit/tests/test_subunit_filter.py
index 3c65ed3..0675484 100644
--- a/python/subunit/tests/test_subunit_filter.py
+++ b/python/subunit/tests/test_subunit_filter.py
@@ -6,7 +6,7 @@
# license at the users choice. A copy of both licenses are available in the
# project source as Apache-2.0 and BSD. You may not use this file except in
# compliance with one of these two licences.
-#
+#
# Unless required by applicable law or agreed to in writing, software
# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@@ -16,119 +16,191 @@
"""Tests for subunit.TestResultFilter."""
+from datetime import datetime
+from subunit import iso8601
import unittest
-from StringIO import StringIO
+
+from testtools import TestCase
+from testtools.compat import _b, BytesIO, StringIO
+from testtools.testresult.doubles import ExtendedTestResult
import subunit
from subunit.test_results import TestResultFilter
-class TestTestResultFilter(unittest.TestCase):
+class TestTestResultFilter(TestCase):
"""Test for TestResultFilter, a TestResult object which filters tests."""
- def _setUp(self):
- self.output = StringIO()
+ # While TestResultFilter works on python objects, using a subunit stream
+ # is an easy pithy way of getting a series of test objects to call into
+ # the TestResult, and as TestResultFilter is intended for use with subunit
+ # also has the benefit of detecting any interface skew issues.
+ example_subunit_stream = _b("""\
+tags: global
+test passed
+success passed
+test failed
+tags: local
+failure failed
+test error
+error error [
+error details
+]
+test skipped
+skip skipped
+test todo
+xfail todo
+""")
+
+ def run_tests(self, result_filter, input_stream=None):
+ """Run tests through the given filter.
+
+ :param result_filter: A filtering TestResult object.
+ :param input_stream: Bytes of subunit stream data. If not provided,
+ uses TestTestResultFilter.example_subunit_stream.
+ """
+ if input_stream is None:
+ input_stream = self.example_subunit_stream
+ test = subunit.ProtocolTestCase(BytesIO(input_stream))
+ test.run(result_filter)
def test_default(self):
"""The default is to exclude success and include everything else."""
- self.filtered_result = unittest.TestResult()
- self.filter = TestResultFilter(self.filtered_result)
- self.run_tests()
+ filtered_result = unittest.TestResult()
+ result_filter = TestResultFilter(filtered_result)
+ self.run_tests(result_filter)
# skips are seen as success by default python TestResult.
self.assertEqual(['error'],
- [error[0].id() for error in self.filtered_result.errors])
+ [error[0].id() for error in filtered_result.errors])
self.assertEqual(['failed'],
[failure[0].id() for failure in
- self.filtered_result.failures])
- self.assertEqual(4, self.filtered_result.testsRun)
+ filtered_result.failures])
+ self.assertEqual(4, filtered_result.testsRun)
def test_exclude_errors(self):
- self.filtered_result = unittest.TestResult()
- self.filter = TestResultFilter(self.filtered_result,
- filter_error=True)
- self.run_tests()
+ filtered_result = unittest.TestResult()
+ result_filter = TestResultFilter(filtered_result, filter_error=True)
+ self.run_tests(result_filter)
# skips are seen as errors by default python TestResult.
- self.assertEqual([], self.filtered_result.errors)
+ self.assertEqual([], filtered_result.errors)
self.assertEqual(['failed'],
[failure[0].id() for failure in
- self.filtered_result.failures])
- self.assertEqual(3, self.filtered_result.testsRun)
+ filtered_result.failures])
+ self.assertEqual(3, filtered_result.testsRun)
+
+ def test_fixup_expected_failures(self):
+ filtered_result = unittest.TestResult()
+ result_filter = TestResultFilter(filtered_result,
+ fixup_expected_failures=set(["failed"]))
+ self.run_tests(result_filter)
+ self.assertEqual(['failed', 'todo'],
+ [failure[0].id() for failure in filtered_result.expectedFailures])
+ self.assertEqual([], filtered_result.failures)
+ self.assertEqual(4, filtered_result.testsRun)
+
+ def test_fixup_expected_errors(self):
+ filtered_result = unittest.TestResult()
+ result_filter = TestResultFilter(filtered_result,
+ fixup_expected_failures=set(["error"]))
+ self.run_tests(result_filter)
+ self.assertEqual(['error', 'todo'],
+ [failure[0].id() for failure in filtered_result.expectedFailures])
+ self.assertEqual([], filtered_result.errors)
+ self.assertEqual(4, filtered_result.testsRun)
+
+ def test_fixup_unexpected_success(self):
+ filtered_result = unittest.TestResult()
+ result_filter = TestResultFilter(filtered_result, filter_success=False,
+ fixup_expected_failures=set(["passed"]))
+ self.run_tests(result_filter)
+ self.assertEqual(['passed'],
+ [passed.id() for passed in filtered_result.unexpectedSuccesses])
+ self.assertEqual(5, filtered_result.testsRun)
def test_exclude_failure(self):
- self.filtered_result = unittest.TestResult()
- self.filter = TestResultFilter(self.filtered_result,
- filter_failure=True)
- self.run_tests()
+ filtered_result = unittest.TestResult()
+ result_filter = TestResultFilter(filtered_result, filter_failure=True)
+ self.run_tests(result_filter)
self.assertEqual(['error'],
- [error[0].id() for error in self.filtered_result.errors])
+ [error[0].id() for error in filtered_result.errors])
self.assertEqual([],
[failure[0].id() for failure in
- self.filtered_result.failures])
- self.assertEqual(3, self.filtered_result.testsRun)
+ filtered_result.failures])
+ self.assertEqual(3, filtered_result.testsRun)
def test_exclude_skips(self):
- self.filtered_result = subunit.TestResultStats(None)
- self.filter = TestResultFilter(self.filtered_result,
- filter_skip=True)
- self.run_tests()
- self.assertEqual(0, self.filtered_result.skipped_tests)
- self.assertEqual(2, self.filtered_result.failed_tests)
- self.assertEqual(3, self.filtered_result.testsRun)
+ filtered_result = subunit.TestResultStats(None)
+ result_filter = TestResultFilter(filtered_result, filter_skip=True)
+ self.run_tests(result_filter)
+ self.assertEqual(0, filtered_result.skipped_tests)
+ self.assertEqual(2, filtered_result.failed_tests)
+ self.assertEqual(3, filtered_result.testsRun)
def test_include_success(self):
- """Success's can be included if requested."""
- self.filtered_result = unittest.TestResult()
- self.filter = TestResultFilter(self.filtered_result,
+ """Successes can be included if requested."""
+ filtered_result = unittest.TestResult()
+ result_filter = TestResultFilter(filtered_result,
filter_success=False)
- self.run_tests()
+ self.run_tests(result_filter)
self.assertEqual(['error'],
- [error[0].id() for error in self.filtered_result.errors])
+ [error[0].id() for error in filtered_result.errors])
self.assertEqual(['failed'],
[failure[0].id() for failure in
- self.filtered_result.failures])
- self.assertEqual(5, self.filtered_result.testsRun)
+ filtered_result.failures])
+ self.assertEqual(5, filtered_result.testsRun)
def test_filter_predicate(self):
"""You can filter by predicate callbacks"""
- self.filtered_result = unittest.TestResult()
+ filtered_result = unittest.TestResult()
def filter_cb(test, outcome, err, details):
return outcome == 'success'
- self.filter = TestResultFilter(self.filtered_result,
+ result_filter = TestResultFilter(filtered_result,
filter_predicate=filter_cb,
filter_success=False)
- self.run_tests()
+ self.run_tests(result_filter)
# Only success should pass
- self.assertEqual(1, self.filtered_result.testsRun)
-
- def run_tests(self):
- self.setUpTestStream()
- self.test = subunit.ProtocolTestCase(self.input_stream)
- self.test.run(self.filter)
-
- def setUpTestStream(self):
- # While TestResultFilter works on python objects, using a subunit
- # stream is an easy pithy way of getting a series of test objects to
- # call into the TestResult, and as TestResultFilter is intended for
- # use with subunit also has the benefit of detecting any interface
- # skew issues.
- self.input_stream = StringIO()
- self.input_stream.write("""tags: global
-test passed
-success passed
-test failed
-tags: local
-failure failed
-test error
-error error [
-error details
-]
-test skipped
-skip skipped
-test todo
-xfail todo
-""")
- self.input_stream.seek(0)
-
+ self.assertEqual(1, filtered_result.testsRun)
+
+ def test_time_ordering_preserved(self):
+ # Passing a subunit stream through TestResultFilter preserves the
+ # relative ordering of 'time' directives and any other subunit
+ # directives that are still included.
+ date_a = datetime(year=2000, month=1, day=1, tzinfo=iso8601.UTC)
+ date_b = datetime(year=2000, month=1, day=2, tzinfo=iso8601.UTC)
+ date_c = datetime(year=2000, month=1, day=3, tzinfo=iso8601.UTC)
+ subunit_stream = _b('\n'.join([
+ "time: %s",
+ "test: foo",
+ "time: %s",
+ "error: foo",
+ "time: %s",
+ ""]) % (date_a, date_b, date_c))
+ result = ExtendedTestResult()
+ result_filter = TestResultFilter(result)
+ self.run_tests(result_filter, subunit_stream)
+ foo = subunit.RemotedTestCase('foo')
+ self.assertEquals(
+ [('time', date_a),
+ ('startTest', foo),
+ ('time', date_b),
+ ('addError', foo, {}),
+ ('stopTest', foo),
+ ('time', date_c)], result._events)
+
+ def test_skip_preserved(self):
+ subunit_stream = _b('\n'.join([
+ "test: foo",
+ "skip: foo",
+ ""]))
+ result = ExtendedTestResult()
+ result_filter = TestResultFilter(result)
+ self.run_tests(result_filter, subunit_stream)
+ foo = subunit.RemotedTestCase('foo')
+ self.assertEquals(
+ [('startTest', foo),
+ ('addSkip', foo, {}),
+ ('stopTest', foo), ], result._events)
+
def test_suite():
loader = subunit.tests.TestUtil.TestLoader()
diff --git a/python/subunit/tests/test_subunit_stats.py b/python/subunit/tests/test_subunit_stats.py
index a7f8fca..6fd3301 100644
--- a/python/subunit/tests/test_subunit_stats.py
+++ b/python/subunit/tests/test_subunit_stats.py
@@ -17,7 +17,8 @@
"""Tests for subunit.TestResultStats."""
import unittest
-from StringIO import StringIO
+
+from testtools.compat import _b, BytesIO, StringIO
import subunit
@@ -28,7 +29,7 @@ class TestTestResultStats(unittest.TestCase):
def setUp(self):
self.output = StringIO()
self.result = subunit.TestResultStats(self.output)
- self.input_stream = StringIO()
+ self.input_stream = BytesIO()
self.test = subunit.ProtocolTestCase(self.input_stream)
def test_stats_empty(self):
@@ -39,7 +40,7 @@ class TestTestResultStats(unittest.TestCase):
self.assertEqual(set(), self.result.seen_tags)
def setUpUsedStream(self):
- self.input_stream.write("""tags: global
+ self.input_stream.write(_b("""tags: global
test passed
success passed
test failed
@@ -51,7 +52,7 @@ test skipped
skip skipped
test todo
xfail todo
-""")
+"""))
self.input_stream.seek(0)
self.test.run(self.result)
diff --git a/python/subunit/tests/test_subunit_tags.py b/python/subunit/tests/test_subunit_tags.py
index 227e2b7..c98506a 100644
--- a/python/subunit/tests/test_subunit_tags.py
+++ b/python/subunit/tests/test_subunit_tags.py
@@ -17,7 +17,8 @@
"""Tests for subunit.tag_stream."""
import unittest
-from StringIO import StringIO
+
+from testtools.compat import StringIO
import subunit
import subunit.test_results
diff --git a/python/subunit/tests/test_tap2subunit.py b/python/subunit/tests/test_tap2subunit.py
index c4ca4cd..11bc191 100644
--- a/python/subunit/tests/test_tap2subunit.py
+++ b/python/subunit/tests/test_tap2subunit.py
@@ -17,7 +17,9 @@
"""Tests for TAP2SubUnit."""
import unittest
-from StringIO import StringIO
+
+from testtools.compat import StringIO
+
import subunit
diff --git a/python/subunit/tests/test_test_protocol.py b/python/subunit/tests/test_test_protocol.py
index 6297ea9..e513ca8 100644
--- a/python/subunit/tests/test_test_protocol.py
+++ b/python/subunit/tests/test_test_protocol.py
@@ -6,7 +6,7 @@
# license at the users choice. A copy of both licenses are available in the
# project source as Apache-2.0 and BSD. You may not use this file except in
# compliance with one of these two licences.
-#
+#
# Unless required by applicable law or agreed to in writing, software
# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@@ -16,24 +16,27 @@
import datetime
import unittest
-from StringIO import StringIO
import os
+from testtools import skipIf, TestCase
+from testtools.compat import _b, _u, BytesIO
from testtools.content import Content, TracebackContent
from testtools.content_type import ContentType
-from testtools import try_imports
-Python26TestResult = try_imports(
- ['testtools.testresult.doubles.Python26TestResult',
- 'testtools.tests.helpers.Python26TestResult'])
-Python27TestResult = try_imports(
- ['testtools.testresult.doubles.Python27TestResult',
- 'testtools.tests.helpers.Python27TestResult'])
-ExtendedTestResult = try_imports(
- ['testtools.testresult.doubles.ExtendedTestResult',
- 'testtools.tests.helpers.ExtendedTestResult'])
+try:
+ from testtools.testresult.doubles import (
+ Python26TestResult,
+ Python27TestResult,
+ ExtendedTestResult,
+ )
+except ImportError:
+ from testtools.tests.helpers import (
+ Python26TestResult,
+ Python27TestResult,
+ ExtendedTestResult,
+ )
import subunit
-from subunit import _remote_exception_str
+from subunit import _remote_exception_str, _remote_exception_str_chunked
import subunit.iso8601 as iso8601
@@ -60,23 +63,23 @@ class TestProtocolServerForward(unittest.TestCase):
def test_story(self):
client = unittest.TestResult()
- out = StringIO()
+ out = BytesIO()
protocol = subunit.TestProtocolServer(client, forward_stream=out)
- pipe = StringIO("test old mcdonald\n"
- "success old mcdonald\n")
+ pipe = BytesIO(_b("test old mcdonald\n"
+ "success old mcdonald\n"))
protocol.readFrom(pipe)
self.assertEqual(client.testsRun, 1)
self.assertEqual(pipe.getvalue(), out.getvalue())
def test_not_command(self):
client = unittest.TestResult()
- out = StringIO()
+ out = BytesIO()
protocol = subunit.TestProtocolServer(client,
stream=subunit.DiscardStream(), forward_stream=out)
- pipe = StringIO("success old mcdonald\n")
+ pipe = BytesIO(_b("success old mcdonald\n"))
protocol.readFrom(pipe)
self.assertEqual(client.testsRun, 0)
- self.assertEqual("", out.getvalue())
+ self.assertEqual(_b(""), out.getvalue())
class TestTestProtocolServerPipe(unittest.TestCase):
@@ -84,14 +87,14 @@ class TestTestProtocolServerPipe(unittest.TestCase):
def test_story(self):
client = unittest.TestResult()
protocol = subunit.TestProtocolServer(client)
- pipe = StringIO("test old mcdonald\n"
+ pipe = BytesIO(_b("test old mcdonald\n"
"success old mcdonald\n"
"test bing crosby\n"
"failure bing crosby [\n"
"foo.c:53:ERROR invalid state\n"
"]\n"
"test an error\n"
- "error an error\n")
+ "error an error\n"))
protocol.readFrom(pipe)
bing = subunit.RemotedTestCase("bing crosby")
an_error = subunit.RemotedTestCase("an error")
@@ -99,9 +102,9 @@ class TestTestProtocolServerPipe(unittest.TestCase):
[(an_error, _remote_exception_str + '\n')])
self.assertEqual(
client.failures,
- [(bing, _remote_exception_str + ": Text attachment: traceback\n"
- "------------\nfoo.c:53:ERROR invalid state\n"
- "------------\n\n")])
+ [(bing, _remote_exception_str +
+ ": foo.c:53:ERROR invalid state\n"
+ "\n")])
self.assertEqual(client.testsRun, 3)
def test_non_test_characters_forwarded_immediately(self):
@@ -112,29 +115,32 @@ class TestTestProtocolServerStartTest(unittest.TestCase):
def setUp(self):
self.client = Python26TestResult()
- self.protocol = subunit.TestProtocolServer(self.client)
+ self.stream = BytesIO()
+ self.protocol = subunit.TestProtocolServer(self.client, self.stream)
def test_start_test(self):
- self.protocol.lineReceived("test old mcdonald\n")
+ self.protocol.lineReceived(_b("test old mcdonald\n"))
self.assertEqual(self.client._events,
[('startTest', subunit.RemotedTestCase("old mcdonald"))])
def test_start_testing(self):
- self.protocol.lineReceived("testing old mcdonald\n")
+ self.protocol.lineReceived(_b("testing old mcdonald\n"))
self.assertEqual(self.client._events,
[('startTest', subunit.RemotedTestCase("old mcdonald"))])
def test_start_test_colon(self):
- self.protocol.lineReceived("test: old mcdonald\n")
+ self.protocol.lineReceived(_b("test: old mcdonald\n"))
self.assertEqual(self.client._events,
[('startTest', subunit.RemotedTestCase("old mcdonald"))])
def test_indented_test_colon_ignored(self):
- self.protocol.lineReceived(" test: old mcdonald\n")
+ ignored_line = _b(" test: old mcdonald\n")
+ self.protocol.lineReceived(ignored_line)
self.assertEqual([], self.client._events)
+ self.assertEqual(self.stream.getvalue(), ignored_line)
def test_start_testing_colon(self):
- self.protocol.lineReceived("testing: old mcdonald\n")
+ self.protocol.lineReceived(_b("testing: old mcdonald\n"))
self.assertEqual(self.client._events,
[('startTest', subunit.RemotedTestCase("old mcdonald"))])
@@ -142,22 +148,22 @@ class TestTestProtocolServerStartTest(unittest.TestCase):
class TestTestProtocolServerPassThrough(unittest.TestCase):
def setUp(self):
- self.stdout = StringIO()
+ self.stdout = BytesIO()
self.test = subunit.RemotedTestCase("old mcdonald")
self.client = ExtendedTestResult()
self.protocol = subunit.TestProtocolServer(self.client, self.stdout)
def keywords_before_test(self):
- self.protocol.lineReceived("failure a\n")
- self.protocol.lineReceived("failure: a\n")
- self.protocol.lineReceived("error a\n")
- self.protocol.lineReceived("error: a\n")
- self.protocol.lineReceived("success a\n")
- self.protocol.lineReceived("success: a\n")
- self.protocol.lineReceived("successful a\n")
- self.protocol.lineReceived("successful: a\n")
- self.protocol.lineReceived("]\n")
- self.assertEqual(self.stdout.getvalue(), "failure a\n"
+ self.protocol.lineReceived(_b("failure a\n"))
+ self.protocol.lineReceived(_b("failure: a\n"))
+ self.protocol.lineReceived(_b("error a\n"))
+ self.protocol.lineReceived(_b("error: a\n"))
+ self.protocol.lineReceived(_b("success a\n"))
+ self.protocol.lineReceived(_b("success: a\n"))
+ self.protocol.lineReceived(_b("successful a\n"))
+ self.protocol.lineReceived(_b("successful: a\n"))
+ self.protocol.lineReceived(_b("]\n"))
+ self.assertEqual(self.stdout.getvalue(), _b("failure a\n"
"failure: a\n"
"error a\n"
"error: a\n"
@@ -165,15 +171,15 @@ class TestTestProtocolServerPassThrough(unittest.TestCase):
"success: a\n"
"successful a\n"
"successful: a\n"
- "]\n")
+ "]\n"))
def test_keywords_before_test(self):
self.keywords_before_test()
self.assertEqual(self.client._events, [])
def test_keywords_after_error(self):
- self.protocol.lineReceived("test old mcdonald\n")
- self.protocol.lineReceived("error old mcdonald\n")
+ self.protocol.lineReceived(_b("test old mcdonald\n"))
+ self.protocol.lineReceived(_b("error old mcdonald\n"))
self.keywords_before_test()
self.assertEqual([
('startTest', self.test),
@@ -182,8 +188,8 @@ class TestTestProtocolServerPassThrough(unittest.TestCase):
], self.client._events)
def test_keywords_after_failure(self):
- self.protocol.lineReceived("test old mcdonald\n")
- self.protocol.lineReceived("failure old mcdonald\n")
+ self.protocol.lineReceived(_b("test old mcdonald\n"))
+ self.protocol.lineReceived(_b("failure old mcdonald\n"))
self.keywords_before_test()
self.assertEqual(self.client._events, [
('startTest', self.test),
@@ -192,8 +198,8 @@ class TestTestProtocolServerPassThrough(unittest.TestCase):
])
def test_keywords_after_success(self):
- self.protocol.lineReceived("test old mcdonald\n")
- self.protocol.lineReceived("success old mcdonald\n")
+ self.protocol.lineReceived(_b("test old mcdonald\n"))
+ self.protocol.lineReceived(_b("success old mcdonald\n"))
self.keywords_before_test()
self.assertEqual([
('startTest', self.test),
@@ -202,19 +208,19 @@ class TestTestProtocolServerPassThrough(unittest.TestCase):
], self.client._events)
def test_keywords_after_test(self):
- self.protocol.lineReceived("test old mcdonald\n")
- self.protocol.lineReceived("test old mcdonald\n")
- self.protocol.lineReceived("failure a\n")
- self.protocol.lineReceived("failure: a\n")
- self.protocol.lineReceived("error a\n")
- self.protocol.lineReceived("error: a\n")
- self.protocol.lineReceived("success a\n")
- self.protocol.lineReceived("success: a\n")
- self.protocol.lineReceived("successful a\n")
- self.protocol.lineReceived("successful: a\n")
- self.protocol.lineReceived("]\n")
- self.protocol.lineReceived("failure old mcdonald\n")
- self.assertEqual(self.stdout.getvalue(), "test old mcdonald\n"
+ self.protocol.lineReceived(_b("test old mcdonald\n"))
+ self.protocol.lineReceived(_b("test old mcdonald\n"))
+ self.protocol.lineReceived(_b("failure a\n"))
+ self.protocol.lineReceived(_b("failure: a\n"))
+ self.protocol.lineReceived(_b("error a\n"))
+ self.protocol.lineReceived(_b("error: a\n"))
+ self.protocol.lineReceived(_b("success a\n"))
+ self.protocol.lineReceived(_b("success: a\n"))
+ self.protocol.lineReceived(_b("successful a\n"))
+ self.protocol.lineReceived(_b("successful: a\n"))
+ self.protocol.lineReceived(_b("]\n"))
+ self.protocol.lineReceived(_b("failure old mcdonald\n"))
+ self.assertEqual(self.stdout.getvalue(), _b("test old mcdonald\n"
"failure a\n"
"failure: a\n"
"error a\n"
@@ -223,7 +229,7 @@ class TestTestProtocolServerPassThrough(unittest.TestCase):
"success: a\n"
"successful a\n"
"successful: a\n"
- "]\n")
+ "]\n"))
self.assertEqual(self.client._events, [
('startTest', self.test),
('addFailure', self.test, {}),
@@ -233,24 +239,24 @@ class TestTestProtocolServerPassThrough(unittest.TestCase):
def test_keywords_during_failure(self):
# A smoke test to make sure that the details parsers have control
# appropriately.
- self.protocol.lineReceived("test old mcdonald\n")
- self.protocol.lineReceived("failure: old mcdonald [\n")
- self.protocol.lineReceived("test old mcdonald\n")
- self.protocol.lineReceived("failure a\n")
- self.protocol.lineReceived("failure: a\n")
- self.protocol.lineReceived("error a\n")
- self.protocol.lineReceived("error: a\n")
- self.protocol.lineReceived("success a\n")
- self.protocol.lineReceived("success: a\n")
- self.protocol.lineReceived("successful a\n")
- self.protocol.lineReceived("successful: a\n")
- self.protocol.lineReceived(" ]\n")
- self.protocol.lineReceived("]\n")
- self.assertEqual(self.stdout.getvalue(), "")
+ self.protocol.lineReceived(_b("test old mcdonald\n"))
+ self.protocol.lineReceived(_b("failure: old mcdonald [\n"))
+ self.protocol.lineReceived(_b("test old mcdonald\n"))
+ self.protocol.lineReceived(_b("failure a\n"))
+ self.protocol.lineReceived(_b("failure: a\n"))
+ self.protocol.lineReceived(_b("error a\n"))
+ self.protocol.lineReceived(_b("error: a\n"))
+ self.protocol.lineReceived(_b("success a\n"))
+ self.protocol.lineReceived(_b("success: a\n"))
+ self.protocol.lineReceived(_b("successful a\n"))
+ self.protocol.lineReceived(_b("successful: a\n"))
+ self.protocol.lineReceived(_b(" ]\n"))
+ self.protocol.lineReceived(_b("]\n"))
+ self.assertEqual(self.stdout.getvalue(), _b(""))
details = {}
details['traceback'] = Content(ContentType("text", "x-traceback",
{'charset': 'utf8'}),
- lambda:[
+ lambda:[_b(
"test old mcdonald\n"
"failure a\n"
"failure: a\n"
@@ -260,7 +266,7 @@ class TestTestProtocolServerPassThrough(unittest.TestCase):
"success: a\n"
"successful a\n"
"successful: a\n"
- "]\n"])
+ "]\n")])
self.assertEqual(self.client._events, [
('startTest', self.test),
('addFailure', self.test, details),
@@ -271,7 +277,7 @@ class TestTestProtocolServerPassThrough(unittest.TestCase):
"""Lines received which cannot be interpreted as any protocol action
should be passed through to sys.stdout.
"""
- bytes = "randombytes\n"
+ bytes = _b("randombytes\n")
self.protocol.lineReceived(bytes)
self.assertEqual(self.stdout.getvalue(), bytes)
@@ -288,10 +294,10 @@ class TestTestProtocolServerLostConnection(unittest.TestCase):
self.assertEqual([], self.client._events)
def test_lost_connection_after_start(self):
- self.protocol.lineReceived("test old mcdonald\n")
+ self.protocol.lineReceived(_b("test old mcdonald\n"))
self.protocol.lostConnection()
failure = subunit.RemoteError(
- u"lost connection during test 'old mcdonald'")
+ _u("lost connection during test 'old mcdonald'"))
self.assertEqual([
('startTest', self.test),
('addError', self.test, failure),
@@ -299,21 +305,21 @@ class TestTestProtocolServerLostConnection(unittest.TestCase):
], self.client._events)
def test_lost_connected_after_error(self):
- self.protocol.lineReceived("test old mcdonald\n")
- self.protocol.lineReceived("error old mcdonald\n")
+ self.protocol.lineReceived(_b("test old mcdonald\n"))
+ self.protocol.lineReceived(_b("error old mcdonald\n"))
self.protocol.lostConnection()
self.assertEqual([
('startTest', self.test),
- ('addError', self.test, subunit.RemoteError(u"")),
+ ('addError', self.test, subunit.RemoteError(_u(""))),
('stopTest', self.test),
], self.client._events)
def do_connection_lost(self, outcome, opening):
- self.protocol.lineReceived("test old mcdonald\n")
- self.protocol.lineReceived("%s old mcdonald %s" % (outcome, opening))
+ self.protocol.lineReceived(_b("test old mcdonald\n"))
+ self.protocol.lineReceived(_b("%s old mcdonald %s" % (outcome, opening)))
self.protocol.lostConnection()
failure = subunit.RemoteError(
- u"lost connection during %s report of test 'old mcdonald'" %
+ _u("lost connection during %s report of test 'old mcdonald'") %
outcome)
self.assertEqual([
('startTest', self.test),
@@ -328,12 +334,12 @@ class TestTestProtocolServerLostConnection(unittest.TestCase):
self.do_connection_lost("error", "[ multipart\n")
def test_lost_connected_after_failure(self):
- self.protocol.lineReceived("test old mcdonald\n")
- self.protocol.lineReceived("failure old mcdonald\n")
+ self.protocol.lineReceived(_b("test old mcdonald\n"))
+ self.protocol.lineReceived(_b("failure old mcdonald\n"))
self.protocol.lostConnection()
self.assertEqual([
('startTest', self.test),
- ('addFailure', self.test, subunit.RemoteError(u"")),
+ ('addFailure', self.test, subunit.RemoteError(_u(""))),
('stopTest', self.test),
], self.client._events)
@@ -344,8 +350,8 @@ class TestTestProtocolServerLostConnection(unittest.TestCase):
self.do_connection_lost("failure", "[ multipart\n")
def test_lost_connection_after_success(self):
- self.protocol.lineReceived("test old mcdonald\n")
- self.protocol.lineReceived("success old mcdonald\n")
+ self.protocol.lineReceived(_b("test old mcdonald\n"))
+ self.protocol.lineReceived(_b("success old mcdonald\n"))
self.protocol.lostConnection()
self.assertEqual([
('startTest', self.test),
@@ -371,18 +377,24 @@ class TestTestProtocolServerLostConnection(unittest.TestCase):
def test_lost_connection_during_xfail_details(self):
self.do_connection_lost("xfail", "[ multipart\n")
+ def test_lost_connection_during_uxsuccess(self):
+ self.do_connection_lost("uxsuccess", "[\n")
+
+ def test_lost_connection_during_uxsuccess_details(self):
+ self.do_connection_lost("uxsuccess", "[ multipart\n")
+
class TestInTestMultipart(unittest.TestCase):
def setUp(self):
self.client = ExtendedTestResult()
self.protocol = subunit.TestProtocolServer(self.client)
- self.protocol.lineReceived("test mcdonalds farm\n")
- self.test = subunit.RemotedTestCase("mcdonalds farm")
+ self.protocol.lineReceived(_b("test mcdonalds farm\n"))
+ self.test = subunit.RemotedTestCase(_u("mcdonalds farm"))
def test__outcome_sets_details_parser(self):
self.protocol._reading_success_details.details_parser = None
- self.protocol._state._outcome(0, "mcdonalds farm [ multipart\n",
+ self.protocol._state._outcome(0, _b("mcdonalds farm [ multipart\n"),
None, self.protocol._reading_success_details)
parser = self.protocol._reading_success_details.details_parser
self.assertNotEqual(None, parser)
@@ -395,11 +407,11 @@ class TestTestProtocolServerAddError(unittest.TestCase):
def setUp(self):
self.client = ExtendedTestResult()
self.protocol = subunit.TestProtocolServer(self.client)
- self.protocol.lineReceived("test mcdonalds farm\n")
+ self.protocol.lineReceived(_b("test mcdonalds farm\n"))
self.test = subunit.RemotedTestCase("mcdonalds farm")
def simple_error_keyword(self, keyword):
- self.protocol.lineReceived("%s mcdonalds farm\n" % keyword)
+ self.protocol.lineReceived(_b("%s mcdonalds farm\n" % keyword))
details = {}
self.assertEqual([
('startTest', self.test),
@@ -414,11 +426,11 @@ class TestTestProtocolServerAddError(unittest.TestCase):
self.simple_error_keyword("error:")
def test_error_empty_message(self):
- self.protocol.lineReceived("error mcdonalds farm [\n")
- self.protocol.lineReceived("]\n")
+ self.protocol.lineReceived(_b("error mcdonalds farm [\n"))
+ self.protocol.lineReceived(_b("]\n"))
details = {}
details['traceback'] = Content(ContentType("text", "x-traceback",
- {'charset': 'utf8'}), lambda:[""])
+ {'charset': 'utf8'}), lambda:[_b("")])
self.assertEqual([
('startTest', self.test),
('addError', self.test, details),
@@ -426,12 +438,12 @@ class TestTestProtocolServerAddError(unittest.TestCase):
], self.client._events)
def error_quoted_bracket(self, keyword):
- self.protocol.lineReceived("%s mcdonalds farm [\n" % keyword)
- self.protocol.lineReceived(" ]\n")
- self.protocol.lineReceived("]\n")
+ self.protocol.lineReceived(_b("%s mcdonalds farm [\n" % keyword))
+ self.protocol.lineReceived(_b(" ]\n"))
+ self.protocol.lineReceived(_b("]\n"))
details = {}
details['traceback'] = Content(ContentType("text", "x-traceback",
- {'charset': 'utf8'}), lambda:["]\n"])
+ {'charset': 'utf8'}), lambda:[_b("]\n")])
self.assertEqual([
('startTest', self.test),
('addError', self.test, details),
@@ -450,7 +462,7 @@ class TestTestProtocolServerAddFailure(unittest.TestCase):
def setUp(self):
self.client = ExtendedTestResult()
self.protocol = subunit.TestProtocolServer(self.client)
- self.protocol.lineReceived("test mcdonalds farm\n")
+ self.protocol.lineReceived(_b("test mcdonalds farm\n"))
self.test = subunit.RemotedTestCase("mcdonalds farm")
def assertFailure(self, details):
@@ -461,7 +473,7 @@ class TestTestProtocolServerAddFailure(unittest.TestCase):
], self.client._events)
def simple_failure_keyword(self, keyword):
- self.protocol.lineReceived("%s mcdonalds farm\n" % keyword)
+ self.protocol.lineReceived(_b("%s mcdonalds farm\n" % keyword))
details = {}
self.assertFailure(details)
@@ -472,20 +484,20 @@ class TestTestProtocolServerAddFailure(unittest.TestCase):
self.simple_failure_keyword("failure:")
def test_failure_empty_message(self):
- self.protocol.lineReceived("failure mcdonalds farm [\n")
- self.protocol.lineReceived("]\n")
+ self.protocol.lineReceived(_b("failure mcdonalds farm [\n"))
+ self.protocol.lineReceived(_b("]\n"))
details = {}
details['traceback'] = Content(ContentType("text", "x-traceback",
- {'charset': 'utf8'}), lambda:[""])
+ {'charset': 'utf8'}), lambda:[_b("")])
self.assertFailure(details)
def failure_quoted_bracket(self, keyword):
- self.protocol.lineReceived("%s mcdonalds farm [\n" % keyword)
- self.protocol.lineReceived(" ]\n")
- self.protocol.lineReceived("]\n")
+ self.protocol.lineReceived(_b("%s mcdonalds farm [\n" % keyword))
+ self.protocol.lineReceived(_b(" ]\n"))
+ self.protocol.lineReceived(_b("]\n"))
details = {}
details['traceback'] = Content(ContentType("text", "x-traceback",
- {'charset': 'utf8'}), lambda:["]\n"])
+ {'charset': 'utf8'}), lambda:[_b("]\n")])
self.assertFailure(details)
def test_failure_quoted_bracket(self):
@@ -523,11 +535,11 @@ class TestTestProtocolServerAddxFail(unittest.TestCase):
def setup_protocol(self):
"""Setup the protocol based on self.client."""
self.protocol = subunit.TestProtocolServer(self.client)
- self.protocol.lineReceived("test mcdonalds farm\n")
+ self.protocol.lineReceived(_b("test mcdonalds farm\n"))
self.test = self.client._events[-1][-1]
def simple_xfail_keyword(self, keyword, as_success):
- self.protocol.lineReceived("%s mcdonalds farm\n" % keyword)
+ self.protocol.lineReceived(_b("%s mcdonalds farm\n" % keyword))
self.check_success_or_xfail(as_success)
def check_success_or_xfail(self, as_success, error_message=None):
@@ -542,13 +554,14 @@ class TestTestProtocolServerAddxFail(unittest.TestCase):
if error_message is not None:
details['traceback'] = Content(
ContentType("text", "x-traceback", {'charset': 'utf8'}),
- lambda:[error_message])
+ lambda:[_b(error_message)])
if isinstance(self.client, ExtendedTestResult):
value = details
else:
if error_message is not None:
- value = subunit.RemoteError(u'Text attachment: traceback\n'
- '------------\n' + error_message + '------------\n')
+ if not len(error_message.strip()):
+ error_message = _u("Empty attachments:\n traceback\n")
+ value = subunit.RemoteError(_u(error_message))
else:
value = subunit.RemoteError()
self.assertEqual([
@@ -582,16 +595,16 @@ class TestTestProtocolServerAddxFail(unittest.TestCase):
self.empty_message(False, error_message="")
def empty_message(self, as_success, error_message="\n"):
- self.protocol.lineReceived("xfail mcdonalds farm [\n")
- self.protocol.lineReceived("]\n")
+ self.protocol.lineReceived(_b("xfail mcdonalds farm [\n"))
+ self.protocol.lineReceived(_b("]\n"))
self.check_success_or_xfail(as_success, error_message)
def xfail_quoted_bracket(self, keyword, as_success):
# This tests it is accepted, but cannot test it is used today, because
# of not having a way to expose it in Python so far.
- self.protocol.lineReceived("%s mcdonalds farm [\n" % keyword)
- self.protocol.lineReceived(" ]\n")
- self.protocol.lineReceived("]\n")
+ self.protocol.lineReceived(_b("%s mcdonalds farm [\n" % keyword))
+ self.protocol.lineReceived(_b(" ]\n"))
+ self.protocol.lineReceived(_b("]\n"))
self.check_success_or_xfail(as_success, "]\n")
def test_xfail_quoted_bracket(self):
@@ -611,6 +624,121 @@ class TestTestProtocolServerAddxFail(unittest.TestCase):
self.xfail_quoted_bracket("xfail:", False)
+class TestTestProtocolServerAddunexpectedSuccess(TestCase):
+ """Tests for the uxsuccess keyword."""
+
+ def capture_expected_failure(self, test, err):
+ self._events.append((test, err))
+
+ def setup_python26(self):
+ """Setup a test object ready to be xfailed and thunk to success."""
+ self.client = Python26TestResult()
+ self.setup_protocol()
+
+ def setup_python27(self):
+ """Setup a test object ready to be xfailed."""
+ self.client = Python27TestResult()
+ self.setup_protocol()
+
+ def setup_python_ex(self):
+ """Setup a test object ready to be xfailed with details."""
+ self.client = ExtendedTestResult()
+ self.setup_protocol()
+
+ def setup_protocol(self):
+ """Setup the protocol based on self.client."""
+ self.protocol = subunit.TestProtocolServer(self.client)
+ self.protocol.lineReceived(_b("test mcdonalds farm\n"))
+ self.test = self.client._events[-1][-1]
+
+ def simple_uxsuccess_keyword(self, keyword, as_fail):
+ self.protocol.lineReceived(_b("%s mcdonalds farm\n" % keyword))
+ self.check_fail_or_uxsuccess(as_fail)
+
+ def check_fail_or_uxsuccess(self, as_fail, error_message=None):
+ details = {}
+ if error_message is not None:
+ details['traceback'] = Content(
+ ContentType("text", "x-traceback", {'charset': 'utf8'}),
+ lambda:[_b(error_message)])
+ if isinstance(self.client, ExtendedTestResult):
+ value = details
+ else:
+ value = None
+ if as_fail:
+ self.client._events[1] = self.client._events[1][:2]
+ # The value is generated within the extended to original decorator:
+ # todo use the testtools matcher to check on this.
+ self.assertEqual([
+ ('startTest', self.test),
+ ('addFailure', self.test),
+ ('stopTest', self.test),
+ ], self.client._events)
+ elif value:
+ self.assertEqual([
+ ('startTest', self.test),
+ ('addUnexpectedSuccess', self.test, value),
+ ('stopTest', self.test),
+ ], self.client._events)
+ else:
+ self.assertEqual([
+ ('startTest', self.test),
+ ('addUnexpectedSuccess', self.test),
+ ('stopTest', self.test),
+ ], self.client._events)
+
+ def test_simple_uxsuccess(self):
+ self.setup_python26()
+ self.simple_uxsuccess_keyword("uxsuccess", True)
+ self.setup_python27()
+ self.simple_uxsuccess_keyword("uxsuccess", False)
+ self.setup_python_ex()
+ self.simple_uxsuccess_keyword("uxsuccess", False)
+
+ def test_simple_uxsuccess_colon(self):
+ self.setup_python26()
+ self.simple_uxsuccess_keyword("uxsuccess:", True)
+ self.setup_python27()
+ self.simple_uxsuccess_keyword("uxsuccess:", False)
+ self.setup_python_ex()
+ self.simple_uxsuccess_keyword("uxsuccess:", False)
+
+ def test_uxsuccess_empty_message(self):
+ self.setup_python26()
+ self.empty_message(True)
+ self.setup_python27()
+ self.empty_message(False)
+ self.setup_python_ex()
+ self.empty_message(False, error_message="")
+
+ def empty_message(self, as_fail, error_message="\n"):
+ self.protocol.lineReceived(_b("uxsuccess mcdonalds farm [\n"))
+ self.protocol.lineReceived(_b("]\n"))
+ self.check_fail_or_uxsuccess(as_fail, error_message)
+
+ def uxsuccess_quoted_bracket(self, keyword, as_fail):
+ self.protocol.lineReceived(_b("%s mcdonalds farm [\n" % keyword))
+ self.protocol.lineReceived(_b(" ]\n"))
+ self.protocol.lineReceived(_b("]\n"))
+ self.check_fail_or_uxsuccess(as_fail, "]\n")
+
+ def test_uxsuccess_quoted_bracket(self):
+ self.setup_python26()
+ self.uxsuccess_quoted_bracket("uxsuccess", True)
+ self.setup_python27()
+ self.uxsuccess_quoted_bracket("uxsuccess", False)
+ self.setup_python_ex()
+ self.uxsuccess_quoted_bracket("uxsuccess", False)
+
+ def test_uxsuccess_colon_quoted_bracket(self):
+ self.setup_python26()
+ self.uxsuccess_quoted_bracket("uxsuccess:", True)
+ self.setup_python27()
+ self.uxsuccess_quoted_bracket("uxsuccess:", False)
+ self.setup_python_ex()
+ self.uxsuccess_quoted_bracket("uxsuccess:", False)
+
+
class TestTestProtocolServerAddSkip(unittest.TestCase):
"""Tests for the skip keyword.
@@ -622,7 +750,7 @@ class TestTestProtocolServerAddSkip(unittest.TestCase):
"""Setup a test object ready to be skipped."""
self.client = ExtendedTestResult()
self.protocol = subunit.TestProtocolServer(self.client)
- self.protocol.lineReceived("test mcdonalds farm\n")
+ self.protocol.lineReceived(_b("test mcdonalds farm\n"))
self.test = self.client._events[-1][-1]
def assertSkip(self, reason):
@@ -637,7 +765,7 @@ class TestTestProtocolServerAddSkip(unittest.TestCase):
], self.client._events)
def simple_skip_keyword(self, keyword):
- self.protocol.lineReceived("%s mcdonalds farm\n" % keyword)
+ self.protocol.lineReceived(_b("%s mcdonalds farm\n" % keyword))
self.assertSkip(None)
def test_simple_skip(self):
@@ -647,17 +775,17 @@ class TestTestProtocolServerAddSkip(unittest.TestCase):
self.simple_skip_keyword("skip:")
def test_skip_empty_message(self):
- self.protocol.lineReceived("skip mcdonalds farm [\n")
- self.protocol.lineReceived("]\n")
- self.assertSkip("")
+ self.protocol.lineReceived(_b("skip mcdonalds farm [\n"))
+ self.protocol.lineReceived(_b("]\n"))
+ self.assertSkip(_b(""))
def skip_quoted_bracket(self, keyword):
# This tests it is accepted, but cannot test it is used today, because
# of not having a way to expose it in Python so far.
- self.protocol.lineReceived("%s mcdonalds farm [\n" % keyword)
- self.protocol.lineReceived(" ]\n")
- self.protocol.lineReceived("]\n")
- self.assertSkip("]\n")
+ self.protocol.lineReceived(_b("%s mcdonalds farm [\n" % keyword))
+ self.protocol.lineReceived(_b(" ]\n"))
+ self.protocol.lineReceived(_b("]\n"))
+ self.assertSkip(_b("]\n"))
def test_skip_quoted_bracket(self):
self.skip_quoted_bracket("skip")
@@ -671,11 +799,11 @@ class TestTestProtocolServerAddSuccess(unittest.TestCase):
def setUp(self):
self.client = ExtendedTestResult()
self.protocol = subunit.TestProtocolServer(self.client)
- self.protocol.lineReceived("test mcdonalds farm\n")
+ self.protocol.lineReceived(_b("test mcdonalds farm\n"))
self.test = subunit.RemotedTestCase("mcdonalds farm")
def simple_success_keyword(self, keyword):
- self.protocol.lineReceived("%s mcdonalds farm\n" % keyword)
+ self.protocol.lineReceived(_b("%s mcdonalds farm\n" % keyword))
self.assertEqual([
('startTest', self.test),
('addSuccess', self.test),
@@ -696,22 +824,22 @@ class TestTestProtocolServerAddSuccess(unittest.TestCase):
], self.client._events)
def test_success_empty_message(self):
- self.protocol.lineReceived("success mcdonalds farm [\n")
- self.protocol.lineReceived("]\n")
+ self.protocol.lineReceived(_b("success mcdonalds farm [\n"))
+ self.protocol.lineReceived(_b("]\n"))
details = {}
details['message'] = Content(ContentType("text", "plain"),
- lambda:[""])
+ lambda:[_b("")])
self.assertSuccess(details)
def success_quoted_bracket(self, keyword):
# This tests it is accepted, but cannot test it is used today, because
# of not having a way to expose it in Python so far.
- self.protocol.lineReceived("%s mcdonalds farm [\n" % keyword)
- self.protocol.lineReceived(" ]\n")
- self.protocol.lineReceived("]\n")
+ self.protocol.lineReceived(_b("%s mcdonalds farm [\n" % keyword))
+ self.protocol.lineReceived(_b(" ]\n"))
+ self.protocol.lineReceived(_b("]\n"))
details = {}
details['message'] = Content(ContentType("text", "plain"),
- lambda:["]\n"])
+ lambda:[_b("]\n")])
self.assertSuccess(details)
def test_success_quoted_bracket(self):
@@ -726,26 +854,26 @@ class TestTestProtocolServerProgress(unittest.TestCase):
def test_progress_accepted_stdlib(self):
self.result = Python26TestResult()
- self.stream = StringIO()
+ self.stream = BytesIO()
self.protocol = subunit.TestProtocolServer(self.result,
stream=self.stream)
- self.protocol.lineReceived("progress: 23")
- self.protocol.lineReceived("progress: -2")
- self.protocol.lineReceived("progress: +4")
- self.assertEqual("", self.stream.getvalue())
+ self.protocol.lineReceived(_b("progress: 23"))
+ self.protocol.lineReceived(_b("progress: -2"))
+ self.protocol.lineReceived(_b("progress: +4"))
+ self.assertEqual(_b(""), self.stream.getvalue())
def test_progress_accepted_extended(self):
# With a progress capable TestResult, progress events are emitted.
self.result = ExtendedTestResult()
- self.stream = StringIO()
+ self.stream = BytesIO()
self.protocol = subunit.TestProtocolServer(self.result,
stream=self.stream)
- self.protocol.lineReceived("progress: 23")
- self.protocol.lineReceived("progress: push")
- self.protocol.lineReceived("progress: -2")
- self.protocol.lineReceived("progress: pop")
- self.protocol.lineReceived("progress: +4")
- self.assertEqual("", self.stream.getvalue())
+ self.protocol.lineReceived(_b("progress: 23"))
+ self.protocol.lineReceived(_b("progress: push"))
+ self.protocol.lineReceived(_b("progress: -2"))
+ self.protocol.lineReceived(_b("progress: pop"))
+ self.protocol.lineReceived(_b("progress: +4"))
+ self.assertEqual(_b(""), self.stream.getvalue())
self.assertEqual([
('progress', 23, subunit.PROGRESS_SET),
('progress', None, subunit.PROGRESS_PUSH),
@@ -763,33 +891,33 @@ class TestTestProtocolServerStreamTags(unittest.TestCase):
self.protocol = subunit.TestProtocolServer(self.client)
def test_initial_tags(self):
- self.protocol.lineReceived("tags: foo bar:baz quux\n")
+ self.protocol.lineReceived(_b("tags: foo bar:baz quux\n"))
self.assertEqual([
('tags', set(["foo", "bar:baz", "quux"]), set()),
], self.client._events)
def test_minus_removes_tags(self):
- self.protocol.lineReceived("tags: -bar quux\n")
+ self.protocol.lineReceived(_b("tags: -bar quux\n"))
self.assertEqual([
('tags', set(["quux"]), set(["bar"])),
], self.client._events)
def test_tags_do_not_get_set_on_test(self):
- self.protocol.lineReceived("test mcdonalds farm\n")
+ self.protocol.lineReceived(_b("test mcdonalds farm\n"))
test = self.client._events[0][-1]
self.assertEqual(None, getattr(test, 'tags', None))
def test_tags_do_not_get_set_on_global_tags(self):
- self.protocol.lineReceived("tags: foo bar\n")
- self.protocol.lineReceived("test mcdonalds farm\n")
+ self.protocol.lineReceived(_b("tags: foo bar\n"))
+ self.protocol.lineReceived(_b("test mcdonalds farm\n"))
test = self.client._events[-1][-1]
self.assertEqual(None, getattr(test, 'tags', None))
def test_tags_get_set_on_test_tags(self):
- self.protocol.lineReceived("test mcdonalds farm\n")
+ self.protocol.lineReceived(_b("test mcdonalds farm\n"))
test = self.client._events[-1][-1]
- self.protocol.lineReceived("tags: foo bar\n")
- self.protocol.lineReceived("success mcdonalds farm\n")
+ self.protocol.lineReceived(_b("tags: foo bar\n"))
+ self.protocol.lineReceived(_b("success mcdonalds farm\n"))
self.assertEqual(None, getattr(test, 'tags', None))
@@ -798,19 +926,19 @@ class TestTestProtocolServerStreamTime(unittest.TestCase):
def test_time_accepted_stdlib(self):
self.result = Python26TestResult()
- self.stream = StringIO()
+ self.stream = BytesIO()
self.protocol = subunit.TestProtocolServer(self.result,
stream=self.stream)
- self.protocol.lineReceived("time: 2001-12-12 12:59:59Z\n")
- self.assertEqual("", self.stream.getvalue())
+ self.protocol.lineReceived(_b("time: 2001-12-12 12:59:59Z\n"))
+ self.assertEqual(_b(""), self.stream.getvalue())
def test_time_accepted_extended(self):
self.result = ExtendedTestResult()
- self.stream = StringIO()
+ self.stream = BytesIO()
self.protocol = subunit.TestProtocolServer(self.result,
stream=self.stream)
- self.protocol.lineReceived("time: 2001-12-12 12:59:59Z\n")
- self.assertEqual("", self.stream.getvalue())
+ self.protocol.lineReceived(_b("time: 2001-12-12 12:59:59Z\n"))
+ self.assertEqual(_b(""), self.stream.getvalue())
self.assertEqual([
('time', datetime.datetime(2001, 12, 12, 12, 59, 59, 0,
iso8601.Utc()))
@@ -846,15 +974,15 @@ class TestRemotedTestCase(unittest.TestCase):
class TestRemoteError(unittest.TestCase):
def test_eq(self):
- error = subunit.RemoteError(u"Something went wrong")
- another_error = subunit.RemoteError(u"Something went wrong")
- different_error = subunit.RemoteError(u"boo!")
+ error = subunit.RemoteError(_u("Something went wrong"))
+ another_error = subunit.RemoteError(_u("Something went wrong"))
+ different_error = subunit.RemoteError(_u("boo!"))
self.assertEqual(error, another_error)
self.assertNotEqual(error, different_error)
self.assertNotEqual(different_error, another_error)
def test_empty_constructor(self):
- self.assertEqual(subunit.RemoteError(), subunit.RemoteError(u""))
+ self.assertEqual(subunit.RemoteError(), subunit.RemoteError(_u("")))
class TestExecTestCase(unittest.TestCase):
@@ -889,7 +1017,7 @@ class TestExecTestCase(unittest.TestCase):
bing = subunit.RemotedTestCase("bing crosby")
bing_details = {}
bing_details['traceback'] = Content(ContentType("text", "x-traceback",
- {'charset': 'utf8'}), lambda:["foo.c:53:ERROR invalid state\n"])
+ {'charset': 'utf8'}), lambda:[_b("foo.c:53:ERROR invalid state\n")])
an_error = subunit.RemotedTestCase("an error")
error_details = {}
self.assertEqual([
@@ -913,7 +1041,8 @@ class TestExecTestCase(unittest.TestCase):
def test_join_dir(self):
sibling = subunit.join_dir(__file__, 'foo')
- expected = '%s/foo' % (os.path.split(__file__)[0],)
+ filedir = os.path.abspath(os.path.dirname(__file__))
+ expected = os.path.join(filedir, 'foo')
self.assertEqual(sibling, expected)
@@ -923,7 +1052,7 @@ class DoExecTestCase(subunit.ExecTestCase):
"""sample-two-script.py"""
-class TestIsolatedTestCase(unittest.TestCase):
+class TestIsolatedTestCase(TestCase):
class SampleIsolatedTestCase(subunit.IsolatedTestCase):
@@ -944,6 +1073,7 @@ class TestIsolatedTestCase(unittest.TestCase):
def test_construct(self):
self.SampleIsolatedTestCase("test_sets_global_state")
+ @skipIf(os.name != "posix", "Need a posix system for forking tests")
def test_run(self):
result = unittest.TestResult()
test = self.SampleIsolatedTestCase("test_sets_global_state")
@@ -959,7 +1089,7 @@ class TestIsolatedTestCase(unittest.TestCase):
#test.debug()
-class TestIsolatedTestSuite(unittest.TestCase):
+class TestIsolatedTestSuite(TestCase):
class SampleTestToIsolate(unittest.TestCase):
@@ -980,6 +1110,7 @@ class TestIsolatedTestSuite(unittest.TestCase):
def test_construct(self):
subunit.IsolatedTestSuite()
+ @skipIf(os.name != "posix", "Need a posix system for forking tests")
def test_run(self):
result = unittest.TestResult()
suite = subunit.IsolatedTestSuite()
@@ -998,48 +1129,48 @@ class TestIsolatedTestSuite(unittest.TestCase):
class TestTestProtocolClient(unittest.TestCase):
def setUp(self):
- self.io = StringIO()
+ self.io = BytesIO()
self.protocol = subunit.TestProtocolClient(self.io)
self.test = TestTestProtocolClient("test_start_test")
self.sample_details = {'something':Content(
- ContentType('text', 'plain'), lambda:['serialised\nform'])}
+ ContentType('text', 'plain'), lambda:[_b('serialised\nform')])}
self.sample_tb_details = dict(self.sample_details)
self.sample_tb_details['traceback'] = TracebackContent(
- subunit.RemoteError(u"boo qux"), self.test)
+ subunit.RemoteError(_u("boo qux")), self.test)
def test_start_test(self):
"""Test startTest on a TestProtocolClient."""
self.protocol.startTest(self.test)
- self.assertEqual(self.io.getvalue(), "test: %s\n" % self.test.id())
+ self.assertEqual(self.io.getvalue(), _b("test: %s\n" % self.test.id()))
def test_stop_test(self):
# stopTest doesn't output anything.
self.protocol.stopTest(self.test)
- self.assertEqual(self.io.getvalue(), "")
+ self.assertEqual(self.io.getvalue(), _b(""))
def test_add_success(self):
"""Test addSuccess on a TestProtocolClient."""
self.protocol.addSuccess(self.test)
self.assertEqual(
- self.io.getvalue(), "successful: %s\n" % self.test.id())
+ self.io.getvalue(), _b("successful: %s\n" % self.test.id()))
def test_add_success_details(self):
"""Test addSuccess on a TestProtocolClient with details."""
self.protocol.addSuccess(self.test, details=self.sample_details)
self.assertEqual(
- self.io.getvalue(), "successful: %s [ multipart\n"
+ self.io.getvalue(), _b("successful: %s [ multipart\n"
"Content-Type: text/plain\n"
"something\n"
- "F\r\nserialised\nform0\r\n]\n" % self.test.id())
+ "F\r\nserialised\nform0\r\n]\n" % self.test.id()))
def test_add_failure(self):
"""Test addFailure on a TestProtocolClient."""
self.protocol.addFailure(
- self.test, subunit.RemoteError(u"boo qux"))
+ self.test, subunit.RemoteError(_u("boo qux")))
self.assertEqual(
self.io.getvalue(),
- ('failure: %s [\n' + _remote_exception_str + ': boo qux\n]\n')
- % self.test.id())
+ _b(('failure: %s [\n' + _remote_exception_str + ': boo qux\n]\n')
+ % self.test.id()))
def test_add_failure_details(self):
"""Test addFailure on a TestProtocolClient with details."""
@@ -1047,24 +1178,23 @@ class TestTestProtocolClient(unittest.TestCase):
self.test, details=self.sample_tb_details)
self.assertEqual(
self.io.getvalue(),
- ("failure: %s [ multipart\n"
+ _b(("failure: %s [ multipart\n"
"Content-Type: text/plain\n"
"something\n"
"F\r\nserialised\nform0\r\n"
"Content-Type: text/x-traceback;charset=utf8,language=python\n"
- "traceback\n"
- "1A\r\n" + _remote_exception_str + ": boo qux\n0\r\n"
- "]\n") % self.test.id())
+ "traceback\n" + _remote_exception_str_chunked + ": boo qux\n0\r\n"
+ "]\n") % self.test.id()))
def test_add_error(self):
"""Test stopTest on a TestProtocolClient."""
self.protocol.addError(
- self.test, subunit.RemoteError(u"phwoar crikey"))
+ self.test, subunit.RemoteError(_u("phwoar crikey")))
self.assertEqual(
self.io.getvalue(),
- ('error: %s [\n' +
+ _b(('error: %s [\n' +
_remote_exception_str + ": phwoar crikey\n"
- "]\n") % self.test.id())
+ "]\n") % self.test.id()))
def test_add_error_details(self):
"""Test stopTest on a TestProtocolClient with details."""
@@ -1072,24 +1202,23 @@ class TestTestProtocolClient(unittest.TestCase):
self.test, details=self.sample_tb_details)
self.assertEqual(
self.io.getvalue(),
- ("error: %s [ multipart\n"
+ _b(("error: %s [ multipart\n"
"Content-Type: text/plain\n"
"something\n"
"F\r\nserialised\nform0\r\n"
"Content-Type: text/x-traceback;charset=utf8,language=python\n"
- "traceback\n"
- "1A\r\n" + _remote_exception_str + ": boo qux\n0\r\n"
- "]\n") % self.test.id())
+ "traceback\n" + _remote_exception_str_chunked + ": boo qux\n0\r\n"
+ "]\n") % self.test.id()))
def test_add_expected_failure(self):
"""Test addExpectedFailure on a TestProtocolClient."""
self.protocol.addExpectedFailure(
- self.test, subunit.RemoteError(u"phwoar crikey"))
+ self.test, subunit.RemoteError(_u("phwoar crikey")))
self.assertEqual(
self.io.getvalue(),
- ('xfail: %s [\n' +
+ _b(('xfail: %s [\n' +
_remote_exception_str + ": phwoar crikey\n"
- "]\n") % self.test.id())
+ "]\n") % self.test.id()))
def test_add_expected_failure_details(self):
"""Test addExpectedFailure on a TestProtocolClient with details."""
@@ -1097,14 +1226,14 @@ class TestTestProtocolClient(unittest.TestCase):
self.test, details=self.sample_tb_details)
self.assertEqual(
self.io.getvalue(),
- ("xfail: %s [ multipart\n"
+ _b(("xfail: %s [ multipart\n"
"Content-Type: text/plain\n"
"something\n"
"F\r\nserialised\nform0\r\n"
"Content-Type: text/x-traceback;charset=utf8,language=python\n"
- "traceback\n"
- "1A\r\n"+ _remote_exception_str + ": boo qux\n0\r\n"
- "]\n") % self.test.id())
+ "traceback\n" + _remote_exception_str_chunked + ": boo qux\n0\r\n"
+ "]\n") % self.test.id()))
+
def test_add_skip(self):
"""Test addSkip on a TestProtocolClient."""
@@ -1112,64 +1241,79 @@ class TestTestProtocolClient(unittest.TestCase):
self.test, "Has it really?")
self.assertEqual(
self.io.getvalue(),
- 'skip: %s [\nHas it really?\n]\n' % self.test.id())
-
+ _b('skip: %s [\nHas it really?\n]\n' % self.test.id()))
+
def test_add_skip_details(self):
"""Test addSkip on a TestProtocolClient with details."""
details = {'reason':Content(
- ContentType('text', 'plain'), lambda:['Has it really?'])}
- self.protocol.addSkip(
- self.test, details=details)
+ ContentType('text', 'plain'), lambda:[_b('Has it really?')])}
+ self.protocol.addSkip(self.test, details=details)
self.assertEqual(
self.io.getvalue(),
- "skip: %s [ multipart\n"
+ _b("skip: %s [ multipart\n"
"Content-Type: text/plain\n"
"reason\n"
"E\r\nHas it really?0\r\n"
- "]\n" % self.test.id())
+ "]\n" % self.test.id()))
def test_progress_set(self):
self.protocol.progress(23, subunit.PROGRESS_SET)
- self.assertEqual(self.io.getvalue(), 'progress: 23\n')
+ self.assertEqual(self.io.getvalue(), _b('progress: 23\n'))
def test_progress_neg_cur(self):
self.protocol.progress(-23, subunit.PROGRESS_CUR)
- self.assertEqual(self.io.getvalue(), 'progress: -23\n')
+ self.assertEqual(self.io.getvalue(), _b('progress: -23\n'))
def test_progress_pos_cur(self):
self.protocol.progress(23, subunit.PROGRESS_CUR)
- self.assertEqual(self.io.getvalue(), 'progress: +23\n')
+ self.assertEqual(self.io.getvalue(), _b('progress: +23\n'))
def test_progress_pop(self):
self.protocol.progress(1234, subunit.PROGRESS_POP)
- self.assertEqual(self.io.getvalue(), 'progress: pop\n')
+ self.assertEqual(self.io.getvalue(), _b('progress: pop\n'))
def test_progress_push(self):
self.protocol.progress(1234, subunit.PROGRESS_PUSH)
- self.assertEqual(self.io.getvalue(), 'progress: push\n')
+ self.assertEqual(self.io.getvalue(), _b('progress: push\n'))
def test_time(self):
# Calling time() outputs a time signal immediately.
self.protocol.time(
datetime.datetime(2009,10,11,12,13,14,15, iso8601.Utc()))
self.assertEqual(
- "time: 2009-10-11 12:13:14.000015Z\n",
+ _b("time: 2009-10-11 12:13:14.000015Z\n"),
self.io.getvalue())
def test_add_unexpected_success(self):
"""Test addUnexpectedSuccess on a TestProtocolClient."""
self.protocol.addUnexpectedSuccess(self.test)
self.assertEqual(
- self.io.getvalue(), "successful: %s\n" % self.test.id())
+ self.io.getvalue(), _b("uxsuccess: %s\n" % self.test.id()))
def test_add_unexpected_success_details(self):
"""Test addUnexpectedSuccess on a TestProtocolClient with details."""
self.protocol.addUnexpectedSuccess(self.test, details=self.sample_details)
self.assertEqual(
- self.io.getvalue(), "successful: %s [ multipart\n"
+ self.io.getvalue(), _b("uxsuccess: %s [ multipart\n"
"Content-Type: text/plain\n"
"something\n"
- "F\r\nserialised\nform0\r\n]\n" % self.test.id())
+ "F\r\nserialised\nform0\r\n]\n" % self.test.id()))
+
+ def test_tags_empty(self):
+ self.protocol.tags(set(), set())
+ self.assertEqual(_b(""), self.io.getvalue())
+
+ def test_tags_add(self):
+ self.protocol.tags(set(['foo']), set())
+ self.assertEqual(_b("tags: foo\n"), self.io.getvalue())
+
+ def test_tags_both(self):
+ self.protocol.tags(set(['quux']), set(['bar']))
+ self.assertEqual(_b("tags: quux -bar\n"), self.io.getvalue())
+
+ def test_tags_gone(self):
+ self.protocol.tags(set(), set(['bar']))
+ self.assertEqual(_b("tags: -bar\n"), self.io.getvalue())
def test_suite():
diff --git a/python/subunit/tests/test_test_results.py b/python/subunit/tests/test_test_results.py
index 43155d5..94d2274 100644
--- a/python/subunit/tests/test_test_results.py
+++ b/python/subunit/tests/test_test_results.py
@@ -6,7 +6,7 @@
# license at the users choice. A copy of both licenses are available in the
# project source as Apache-2.0 and BSD. You may not use this file except in
# compliance with one of these two licences.
-#
+#
# Unless required by applicable law or agreed to in writing, software
# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@@ -17,6 +17,9 @@
import datetime
import unittest
+from testtools import TestCase
+from testtools.testresult.doubles import ExtendedTestResult
+
import subunit
import subunit.iso8601 as iso8601
import subunit.test_results
@@ -76,22 +79,22 @@ class TestHookedTestResultDecorator(unittest.TestCase):
def test_startTest(self):
self.result.startTest(self)
-
+
def test_startTestRun(self):
self.result.startTestRun()
-
+
def test_stopTest(self):
self.result.stopTest(self)
-
+
def test_stopTestRun(self):
self.result.stopTestRun()
def test_addError(self):
self.result.addError(self, subunit.RemoteError())
-
+
def test_addError_details(self):
self.result.addError(self, details={})
-
+
def test_addFailure(self):
self.result.addFailure(self, subunit.RemoteError())
@@ -136,7 +139,7 @@ class TestHookedTestResultDecorator(unittest.TestCase):
def test_time(self):
self.result.time(None)
-
+
class TestAutoTimingTestResultDecorator(unittest.TestCase):
@@ -187,6 +190,110 @@ class TestAutoTimingTestResultDecorator(unittest.TestCase):
self.assertNotEqual(None, self.decorated._calls[2])
+class TestTagCollapsingDecorator(TestCase):
+
+ def test_tags_forwarded_outside_of_tests(self):
+ result = ExtendedTestResult()
+ tag_collapser = subunit.test_results.TagCollapsingDecorator(result)
+ tag_collapser.tags(set(['a', 'b']), set())
+ self.assertEquals(
+ [('tags', set(['a', 'b']), set([]))], result._events)
+
+ def test_tags_collapsed_inside_of_tests(self):
+ result = ExtendedTestResult()
+ tag_collapser = subunit.test_results.TagCollapsingDecorator(result)
+ test = subunit.RemotedTestCase('foo')
+ tag_collapser.startTest(test)
+ tag_collapser.tags(set(['a']), set())
+ tag_collapser.tags(set(['b']), set(['a']))
+ tag_collapser.tags(set(['c']), set())
+ tag_collapser.stopTest(test)
+ self.assertEquals(
+ [('startTest', test),
+ ('tags', set(['b', 'c']), set(['a'])),
+ ('stopTest', test)],
+ result._events)
+
+ def test_tags_collapsed_inside_of_tests_different_ordering(self):
+ result = ExtendedTestResult()
+ tag_collapser = subunit.test_results.TagCollapsingDecorator(result)
+ test = subunit.RemotedTestCase('foo')
+ tag_collapser.startTest(test)
+ tag_collapser.tags(set(), set(['a']))
+ tag_collapser.tags(set(['a', 'b']), set())
+ tag_collapser.tags(set(['c']), set())
+ tag_collapser.stopTest(test)
+ self.assertEquals(
+ [('startTest', test),
+ ('tags', set(['a', 'b', 'c']), set()),
+ ('stopTest', test)],
+ result._events)
+
+
+class TestTimeCollapsingDecorator(TestCase):
+
+ def make_time(self):
+ # Heh heh.
+ return datetime.datetime(
+ 2000, 1, self.getUniqueInteger(), tzinfo=iso8601.UTC)
+
+ def test_initial_time_forwarded(self):
+ # We always forward the first time event we see.
+ result = ExtendedTestResult()
+ tag_collapser = subunit.test_results.TimeCollapsingDecorator(result)
+ a_time = self.make_time()
+ tag_collapser.time(a_time)
+ self.assertEquals([('time', a_time)], result._events)
+
+ def test_time_collapsed_to_first_and_last(self):
+ # If there are many consecutive time events, only the first and last
+ # are sent through.
+ result = ExtendedTestResult()
+ tag_collapser = subunit.test_results.TimeCollapsingDecorator(result)
+ times = [self.make_time() for i in range(5)]
+ for a_time in times:
+ tag_collapser.time(a_time)
+ tag_collapser.startTest(subunit.RemotedTestCase('foo'))
+ self.assertEquals(
+ [('time', times[0]), ('time', times[-1])], result._events[:-1])
+
+ def test_only_one_time_sent(self):
+ # If we receive a single time event followed by a non-time event, we
+ # send exactly one time event.
+ result = ExtendedTestResult()
+ tag_collapser = subunit.test_results.TimeCollapsingDecorator(result)
+ a_time = self.make_time()
+ tag_collapser.time(a_time)
+ tag_collapser.startTest(subunit.RemotedTestCase('foo'))
+ self.assertEquals([('time', a_time)], result._events[:-1])
+
+ def test_duplicate_times_not_sent(self):
+ # Many time events with the exact same time are collapsed into one
+ # time event.
+ result = ExtendedTestResult()
+ tag_collapser = subunit.test_results.TimeCollapsingDecorator(result)
+ a_time = self.make_time()
+ for i in range(5):
+ tag_collapser.time(a_time)
+ tag_collapser.startTest(subunit.RemotedTestCase('foo'))
+ self.assertEquals([('time', a_time)], result._events[:-1])
+
+ def test_no_times_inserted(self):
+ result = ExtendedTestResult()
+ tag_collapser = subunit.test_results.TimeCollapsingDecorator(result)
+ a_time = self.make_time()
+ tag_collapser.time(a_time)
+ foo = subunit.RemotedTestCase('foo')
+ tag_collapser.startTest(foo)
+ tag_collapser.addSuccess(foo)
+ tag_collapser.stopTest(foo)
+ self.assertEquals(
+ [('time', a_time),
+ ('startTest', foo),
+ ('addSuccess', foo),
+ ('stopTest', foo)], result._events)
+
+
def test_suite():
loader = subunit.tests.TestUtil.TestLoader()
result = loader.loadTestsFromName(__name__)