diff options
author | Robert Collins <robertc@robertcollins.net> | 2014-01-13 13:08:43 +1300 |
---|---|---|
committer | Robert Collins <robertc@robertcollins.net> | 2014-01-13 13:08:43 +1300 |
commit | ffcdee8c14c8eec3f2f2c2015a4b3593df1b556a (patch) | |
tree | e87190b5d94d5683f7a1ab6bc3ebf0f424912df5 | |
parent | 3d0dc1aa86187a1bd2df007b6822d98c87000611 (diff) | |
parent | 3c660e191002c852bf579bbee92597a274c8ada8 (diff) | |
download | subunit-ffcdee8c14c8eec3f2f2c2015a4b3593df1b556a.tar.gz |
* Add ``subunit-output`` tool that can generate a Subunit v2 bytestream from
arguments passed on the command line. (Thomi Richards, #1252084)
-rw-r--r-- | INSTALL | 5 | ||||
-rw-r--r-- | Makefile.am | 7 | ||||
-rw-r--r-- | NEWS | 6 | ||||
-rw-r--r-- | filters/subunit-output | 23 | ||||
-rw-r--r-- | python/subunit/_output.py | 203 | ||||
-rw-r--r-- | python/subunit/tests/__init__.py | 6 | ||||
-rw-r--r-- | python/subunit/tests/test_output_filter.py | 596 | ||||
-rwxr-xr-x | setup.py | 8 |
8 files changed, 848 insertions, 6 deletions
@@ -14,9 +14,12 @@ Dependencies * Python for the filters * 'testtools' (On Debian and Ubuntu systems the 'python-testtools' package, the testtools package on pypi, or https://launchpad.net/testtools) for - the extended test API which permits attachments. Version 0.9.30 or newer is + the extended test API which permits attachments. Version 0.9.30 or newer is required. Of particular note, http://testtools.python-hosting.com/ is not the testtools you want. +* 'testscenarios' (On Debian and Ubuntu systems the 'python-testscenarios' + package, the 'testscenarios' package on pypi, or + https://launchpad.net/testscenarios) for running some of the python unit tests. * A C compiler for the C bindings * Perl for the Perl tools (including subunit-diff) * Check to run the subunit test suite. diff --git a/Makefile.am b/Makefile.am index ab63892..e8f018e 100644 --- a/Makefile.am +++ b/Makefile.am @@ -27,9 +27,10 @@ EXTRA_DIST = \ python/subunit/tests/test_chunked.py \ python/subunit/tests/test_details.py \ python/subunit/tests/test_filters.py \ + python/subunit/tests/test_output_filter.py \ python/subunit/tests/test_progress_model.py \ - python/subunit/tests/test_subunit_filter.py \ python/subunit/tests/test_run.py \ + python/subunit/tests/test_subunit_filter.py \ python/subunit/tests/test_subunit_stats.py \ python/subunit/tests/test_subunit_tags.py \ python/subunit/tests/test_tap2subunit.py \ @@ -53,6 +54,7 @@ dist_bin_SCRIPTS = \ filters/subunit-filter \ filters/subunit-ls \ filters/subunit-notify \ + filters/subunit-output \ filters/subunit-stats \ filters/subunit-tags \ filters/subunit2csv \ @@ -78,7 +80,8 @@ pkgpython_PYTHON = \ python/subunit/progress_model.py \ python/subunit/run.py \ python/subunit/v2.py \ - python/subunit/test_results.py + python/subunit/test_results.py \ + python/subunit/_output.py lib_LTLIBRARIES = libsubunit.la lib_LTLIBRARIES += libcppunit_subunit.la @@ -5,6 +5,12 @@ subunit release notes NEXT (In development) --------------------- +IMPROVMENTS +~~~~~~~~~~~ + +* Add ``subunit-output`` tool that can generate a Subunit v2 bytestream from + arguments passed on the command line. (Thomi Richards, #1252084) + 0.0.16 ------ diff --git a/filters/subunit-output b/filters/subunit-output new file mode 100644 index 0000000..61e5d11 --- /dev/null +++ b/filters/subunit-output @@ -0,0 +1,23 @@ +#!/usr/bin/env python +# subunit: extensions to python unittest to get test results from subprocesses. +# Copyright (C) 2013 Subunit Contributors +# +# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause +# license at the users choice. A copy of both licenses are available in the +# project source as Apache-2.0 and BSD. You may not use this file except in +# compliance with one of these two licences. +# +# Unless required by applicable law or agreed to in writing, software +# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# license you chose for the specific language governing permissions and +# limitations under that license. + + +"""A command-line tool to generate a subunit result byte-stream.""" + +from subunit._output import output_main + + +if __name__ == '__main__': + exit(output_main()) diff --git a/python/subunit/_output.py b/python/subunit/_output.py new file mode 100644 index 0000000..aa92646 --- /dev/null +++ b/python/subunit/_output.py @@ -0,0 +1,203 @@ +# subunit: extensions to python unittest to get test results from subprocesses. +# Copyright (C) 2013 Subunit Contributors +# +# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause +# license at the users choice. A copy of both licenses are available in the +# project source as Apache-2.0 and BSD. You may not use this file except in +# compliance with one of these two licences. +# +# Unless required by applicable law or agreed to in writing, software +# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# license you chose for the specific language governing permissions and +# limitations under that license. +# + +import datetime +from functools import partial +from optparse import ( + OptionGroup, + OptionParser, + OptionValueError, +) +import sys + +from subunit import make_stream_binary +from subunit.iso8601 import UTC +from subunit.v2 import StreamResultToBytes + + +_FINAL_ACTIONS = frozenset([ + 'exists', + 'fail', + 'skip', + 'success', + 'uxsuccess', + 'xfail', +]) +_ALL_ACTIONS = _FINAL_ACTIONS.union(['inprogress']) +_CHUNK_SIZE=3670016 # 3.5 MiB + + +def output_main(): + args = parse_arguments() + output = StreamResultToBytes(sys.stdout) + generate_stream_results(args, output) + return 0 + + +def parse_arguments(args=None, ParserClass=OptionParser): + """Parse arguments from the command line. + + If specified, args must be a list of strings, similar to sys.argv[1:]. + + ParserClass may be specified to override the class we use to parse the + command-line arguments. This is useful for testing. + """ + parser = ParserClass( + prog="subunit-output", + description="A tool to generate a subunit v2 result byte-stream", + usage="subunit-output [-h] [status TEST_ID] [options]", + ) + parser.set_default('tags', None) + parser.set_default('test_id', None) + + status_commands = OptionGroup( + parser, + "Status Commands", + "These options report the status of a test. TEST_ID must be a string " + "that uniquely identifies the test." + ) + for action_name in _ALL_ACTIONS: + status_commands.add_option( + "--%s" % action_name, + nargs=1, + action="callback", + callback=set_status_cb, + callback_args=(action_name,), + dest="action", + metavar="TEST_ID", + help="Report a test status." + ) + parser.add_option_group(status_commands) + + file_commands = OptionGroup( + parser, + "File Options", + "These options control attaching data to a result stream. They can " + "either be specified with a status command, in which case the file " + "is attached to the test status, or by themselves, in which case " + "the file is attached to the stream (and not associated with any " + "test id)." + ) + file_commands.add_option( + "--attach-file", + help="Attach a file to the result stream for this test. If '-' is " + "specified, stdin will be read instead. In this case, the file " + "name will be set to 'stdin' (but can still be overridden with " + "the --file-name option)." + ) + file_commands.add_option( + "--file-name", + help="The name to give this file attachment. If not specified, the " + "name of the file on disk will be used, or 'stdin' in the case " + "where '-' was passed to the '--attach-file' argument. This option" + " may only be specified when '--attach-file' is specified.", + ) + file_commands.add_option( + "--mimetype", + help="The mime type to send with this file. This is only used if the " + "--attach-file argument is used. This argument is optional. If it " + "is not specified, the file will be sent without a mime type. This " + "option may only be specified when '--attach-file' is specified.", + default=None + ) + parser.add_option_group(file_commands) + + parser.add_option( + "--tag", + help="Specifies a tag. May be used multiple times", + action="append", + dest="tags", + default=[] + ) + + (options, args) = parser.parse_args(args) + if options.mimetype and not options.attach_file: + parser.error("Cannot specify --mimetype without --attach-file") + if options.file_name and not options.attach_file: + parser.error("Cannot specify --file-name without --attach-file") + if options.attach_file: + if options.attach_file == '-': + if not options.file_name: + options.file_name = 'stdin' + options.attach_file = make_stream_binary(sys.stdin) + else: + try: + options.attach_file = open(options.attach_file, 'rb') + except IOError as e: + parser.error("Cannot open %s (%s)" % (options.attach_file, e.strerror)) + + return options + + +def set_status_cb(option, opt_str, value, parser, status_name): + if getattr(parser.values, "action", None) is not None: + raise OptionValueError("argument %s: Only one status may be specified at once." % opt_str) + + if len(parser.rargs) == 0: + raise OptionValueError("argument %s: must specify a single TEST_ID." % opt_str) + parser.values.action = status_name + parser.values.test_id = parser.rargs.pop(0) + + +def generate_stream_results(args, output_writer): + output_writer.startTestRun() + + if args.attach_file: + reader = partial(args.attach_file.read, _CHUNK_SIZE) + this_file_hunk = reader() + next_file_hunk = reader() + + is_first_packet = True + is_last_packet = False + while not is_last_packet: + write_status = output_writer.status + + if is_first_packet: + if args.attach_file: + if args.mimetype: + write_status = partial(write_status, mime_type=args.mimetype) + if args.tags: + write_status = partial(write_status, test_tags=set(args.tags)) + write_status = partial(write_status, timestamp=create_timestamp()) + if args.action not in _FINAL_ACTIONS: + write_status = partial(write_status, test_status=args.action) + is_first_packet = False + + if args.attach_file: + filename = args.file_name or args.attach_file.name + write_status = partial(write_status, file_name=filename, file_bytes=this_file_hunk) + if next_file_hunk == b'': + write_status = partial(write_status, eof=True) + is_last_packet = True + else: + this_file_hunk = next_file_hunk + next_file_hunk = reader() + else: + is_last_packet = True + + if args.test_id: + write_status = partial(write_status, test_id=args.test_id) + + if is_last_packet: + if args.action in _FINAL_ACTIONS: + write_status = partial(write_status, test_status=args.action) + + write_status() + + output_writer.stopTestRun() + + +def create_timestamp(): + return datetime.datetime.now(UTC) diff --git a/python/subunit/tests/__init__.py b/python/subunit/tests/__init__.py index b45d7f9..c1c2c64 100644 --- a/python/subunit/tests/__init__.py +++ b/python/subunit/tests/__init__.py @@ -17,6 +17,8 @@ import sys from unittest import TestLoader +from testscenarios import generate_scenarios + # Before the test module imports to avoid circularity. # For testing: different pythons have different str() implementations. @@ -34,6 +36,7 @@ from subunit.tests import ( test_chunked, test_details, test_filters, + test_output_filter, test_progress_model, test_run, test_subunit_filter, @@ -60,4 +63,7 @@ def test_suite(): result.addTest(loader.loadTestsFromModule(test_subunit_tags)) result.addTest(loader.loadTestsFromModule(test_subunit_stats)) result.addTest(loader.loadTestsFromModule(test_run)) + result.addTests( + generate_scenarios(loader.loadTestsFromModule(test_output_filter)) + ) return result diff --git a/python/subunit/tests/test_output_filter.py b/python/subunit/tests/test_output_filter.py new file mode 100644 index 0000000..0f61ac5 --- /dev/null +++ b/python/subunit/tests/test_output_filter.py @@ -0,0 +1,596 @@ +# +# subunit: extensions to python unittest to get test results from subprocesses. +# Copyright (C) 2013 Subunit Contributors +# +# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause +# license at the users choice. A copy of both licenses are available in the +# project source as Apache-2.0 and BSD. You may not use this file except in +# compliance with one of these two licences. +# +# Unless required by applicable law or agreed to in writing, software +# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# license you chose for the specific language governing permissions and +# limitations under that license. +# + +import datetime +from functools import partial +from io import BytesIO, StringIO, TextIOWrapper +import optparse +import sys +from tempfile import NamedTemporaryFile + +from contextlib import contextmanager +from testtools import TestCase +from testtools.compat import _u +from testtools.matchers import ( + Equals, + Matcher, + MatchesAny, + MatchesListwise, + Mismatch, + raises, +) +from testtools.testresult.doubles import StreamResult + +from subunit.iso8601 import UTC +from subunit.v2 import StreamResultToBytes, ByteStreamToStreamResult +from subunit._output import ( + _ALL_ACTIONS, + _FINAL_ACTIONS, + generate_stream_results, + parse_arguments, +) +import subunit._output as _o + + +class SafeOptionParser(optparse.OptionParser): + """An ArgumentParser class that doesn't call sys.exit.""" + + def exit(self, status=0, message=""): + raise RuntimeError(message) + + def error(self, message): + raise RuntimeError(message) + + +safe_parse_arguments = partial(parse_arguments, ParserClass=SafeOptionParser) + + +class TestStatusArgParserTests(TestCase): + + scenarios = [ + (cmd, dict(command=cmd, option='--' + cmd)) for cmd in _ALL_ACTIONS + ] + + def test_can_parse_all_commands_with_test_id(self): + test_id = self.getUniqueString() + args = safe_parse_arguments(args=[self.option, test_id]) + + self.assertThat(args.action, Equals(self.command)) + self.assertThat(args.test_id, Equals(test_id)) + + def test_all_commands_parse_file_attachment(self): + with NamedTemporaryFile() as tmp_file: + args = safe_parse_arguments( + args=[self.option, 'foo', '--attach-file', tmp_file.name] + ) + self.assertThat(args.attach_file.name, Equals(tmp_file.name)) + + def test_all_commands_accept_mimetype_argument(self): + with NamedTemporaryFile() as tmp_file: + args = safe_parse_arguments( + args=[self.option, 'foo', '--attach-file', tmp_file.name, '--mimetype', "text/plain"] + ) + self.assertThat(args.mimetype, Equals("text/plain")) + + def test_all_commands_accept_file_name_argument(self): + with NamedTemporaryFile() as tmp_file: + args = safe_parse_arguments( + args=[self.option, 'foo', '--attach-file', tmp_file.name, '--file-name', "foo"] + ) + self.assertThat(args.file_name, Equals("foo")) + + def test_all_commands_accept_tags_argument(self): + args = safe_parse_arguments( + args=[self.option, 'foo', '--tag', "foo", "--tag", "bar", "--tag", "baz"] + ) + self.assertThat(args.tags, Equals(["foo", "bar", "baz"])) + + def test_attach_file_with_hyphen_opens_stdin(self): + self.patch(_o.sys, 'stdin', TextIOWrapper(BytesIO(b"Hello"))) + args = safe_parse_arguments( + args=[self.option, "foo", "--attach-file", "-"] + ) + + self.assertThat(args.attach_file.read(), Equals(b"Hello")) + + def test_attach_file_with_hyphen_sets_filename_to_stdin(self): + args = safe_parse_arguments( + args=[self.option, "foo", "--attach-file", "-"] + ) + + self.assertThat(args.file_name, Equals("stdin")) + + def test_can_override_stdin_filename(self): + args = safe_parse_arguments( + args=[self.option, "foo", "--attach-file", "-", '--file-name', 'foo'] + ) + + self.assertThat(args.file_name, Equals("foo")) + + def test_requires_test_id(self): + fn = lambda: safe_parse_arguments(args=[self.option]) + self.assertThat( + fn, + raises(RuntimeError('argument %s: must specify a single TEST_ID.' % self.option)) + ) + + +class ArgParserTests(TestCase): + + def test_can_parse_attach_file_without_test_id(self): + with NamedTemporaryFile() as tmp_file: + args = safe_parse_arguments( + args=["--attach-file", tmp_file.name] + ) + self.assertThat(args.attach_file.name, Equals(tmp_file.name)) + + def test_can_run_without_args(self): + args = safe_parse_arguments([]) + + def test_cannot_specify_more_than_one_status_command(self): + fn = lambda: safe_parse_arguments(['--fail', 'foo', '--skip', 'bar']) + self.assertThat( + fn, + raises(RuntimeError('argument --skip: Only one status may be specified at once.')) + ) + + def test_cannot_specify_mimetype_without_attach_file(self): + fn = lambda: safe_parse_arguments(['--mimetype', 'foo']) + self.assertThat( + fn, + raises(RuntimeError('Cannot specify --mimetype without --attach-file')) + ) + + def test_cannot_specify_filename_without_attach_file(self): + fn = lambda: safe_parse_arguments(['--file-name', 'foo']) + self.assertThat( + fn, + raises(RuntimeError('Cannot specify --file-name without --attach-file')) + ) + + def test_can_specify_tags_without_status_command(self): + args = safe_parse_arguments(['--tag', 'foo']) + self.assertEqual(['foo'], args.tags) + + def test_must_specify_tags_with_tags_options(self): + fn = lambda: safe_parse_arguments(['--fail', 'foo', '--tag']) + self.assertThat( + fn, + MatchesAny( + raises(RuntimeError('--tag option requires 1 argument')), + raises(RuntimeError('--tag option requires an argument')), + ) + ) + +def get_result_for(commands): + """Get a result object from *commands. + + Runs the 'generate_stream_results' function from subunit._output after + parsing *commands as if they were specified on the command line. The + resulting bytestream is then converted back into a result object and + returned. + """ + result = StreamResult() + args = safe_parse_arguments(commands) + generate_stream_results(args, result) + return result + + +@contextmanager +def temp_file_contents(data): + """Create a temporary file on disk containing 'data'.""" + with NamedTemporaryFile() as f: + f.write(data) + f.seek(0) + yield f + + +class StatusStreamResultTests(TestCase): + + scenarios = [ + (s, dict(status=s, option='--' + s)) for s in _ALL_ACTIONS + ] + + _dummy_timestamp = datetime.datetime(2013, 1, 1, 0, 0, 0, 0, UTC) + + def setUp(self): + super(StatusStreamResultTests, self).setUp() + self.patch(_o, 'create_timestamp', lambda: self._dummy_timestamp) + self.test_id = self.getUniqueString() + + def test_only_one_packet_is_generated(self): + result = get_result_for([self.option, self.test_id]) + self.assertThat( + len(result._events), + Equals(3) # startTestRun and stopTestRun are also called, making 3 total. + ) + + def test_correct_status_is_generated(self): + result = get_result_for([self.option, self.test_id]) + + self.assertThat( + result._events[1], + MatchesStatusCall(test_status=self.status) + ) + + def test_all_commands_generate_tags(self): + result = get_result_for([self.option, self.test_id, '--tag', 'hello', '--tag', 'world']) + self.assertThat( + result._events[1], + MatchesStatusCall(test_tags=set(['hello', 'world'])) + ) + + def test_all_commands_generate_timestamp(self): + result = get_result_for([self.option, self.test_id]) + + self.assertThat( + result._events[1], + MatchesStatusCall(timestamp=self._dummy_timestamp) + ) + + def test_all_commands_generate_correct_test_id(self): + result = get_result_for([self.option, self.test_id]) + + self.assertThat( + result._events[1], + MatchesStatusCall(test_id=self.test_id) + ) + + def test_file_is_sent_in_single_packet(self): + with temp_file_contents(b"Hello") as f: + result = get_result_for([self.option, self.test_id, '--attach-file', f.name]) + + self.assertThat( + result._events, + MatchesListwise([ + MatchesStatusCall(call='startTestRun'), + MatchesStatusCall(file_bytes=b'Hello', eof=True), + MatchesStatusCall(call='stopTestRun'), + ]) + ) + + def test_can_read_binary_files(self): + with temp_file_contents(b"\xDE\xAD\xBE\xEF") as f: + result = get_result_for([self.option, self.test_id, '--attach-file', f.name]) + + self.assertThat( + result._events, + MatchesListwise([ + MatchesStatusCall(call='startTestRun'), + MatchesStatusCall(file_bytes=b"\xDE\xAD\xBE\xEF", eof=True), + MatchesStatusCall(call='stopTestRun'), + ]) + ) + + def test_can_read_empty_files(self): + with temp_file_contents(b"") as f: + result = get_result_for([self.option, self.test_id, '--attach-file', f.name]) + + self.assertThat( + result._events, + MatchesListwise([ + MatchesStatusCall(call='startTestRun'), + MatchesStatusCall(file_bytes=b"", file_name=f.name, eof=True), + MatchesStatusCall(call='stopTestRun'), + ]) + ) + + def test_can_read_stdin(self): + self.patch(_o.sys, 'stdin', TextIOWrapper(BytesIO(b"\xFE\xED\xFA\xCE"))) + result = get_result_for([self.option, self.test_id, '--attach-file', '-']) + + self.assertThat( + result._events, + MatchesListwise([ + MatchesStatusCall(call='startTestRun'), + MatchesStatusCall(file_bytes=b"\xFE\xED\xFA\xCE", file_name='stdin', eof=True), + MatchesStatusCall(call='stopTestRun'), + ]) + ) + + def test_file_is_sent_with_test_id(self): + with temp_file_contents(b"Hello") as f: + result = get_result_for([self.option, self.test_id, '--attach-file', f.name]) + + self.assertThat( + result._events, + MatchesListwise([ + MatchesStatusCall(call='startTestRun'), + MatchesStatusCall(test_id=self.test_id, file_bytes=b'Hello', eof=True), + MatchesStatusCall(call='stopTestRun'), + ]) + ) + + def test_file_is_sent_with_test_status(self): + with temp_file_contents(b"Hello") as f: + result = get_result_for([self.option, self.test_id, '--attach-file', f.name]) + + self.assertThat( + result._events, + MatchesListwise([ + MatchesStatusCall(call='startTestRun'), + MatchesStatusCall(test_status=self.status, file_bytes=b'Hello', eof=True), + MatchesStatusCall(call='stopTestRun'), + ]) + ) + + def test_file_chunk_size_is_honored(self): + with temp_file_contents(b"Hello") as f: + self.patch(_o, '_CHUNK_SIZE', 1) + result = get_result_for([self.option, self.test_id, '--attach-file', f.name]) + + self.assertThat( + result._events, + MatchesListwise([ + MatchesStatusCall(call='startTestRun'), + MatchesStatusCall(test_id=self.test_id, file_bytes=b'H', eof=False), + MatchesStatusCall(test_id=self.test_id, file_bytes=b'e', eof=False), + MatchesStatusCall(test_id=self.test_id, file_bytes=b'l', eof=False), + MatchesStatusCall(test_id=self.test_id, file_bytes=b'l', eof=False), + MatchesStatusCall(test_id=self.test_id, file_bytes=b'o', eof=True), + MatchesStatusCall(call='stopTestRun'), + ]) + ) + + def test_file_mimetype_specified_once_only(self): + with temp_file_contents(b"Hi") as f: + self.patch(_o, '_CHUNK_SIZE', 1) + result = get_result_for([ + self.option, + self.test_id, + '--attach-file', + f.name, + '--mimetype', + 'text/plain', + ]) + + self.assertThat( + result._events, + MatchesListwise([ + MatchesStatusCall(call='startTestRun'), + MatchesStatusCall(test_id=self.test_id, mime_type='text/plain', file_bytes=b'H', eof=False), + MatchesStatusCall(test_id=self.test_id, mime_type=None, file_bytes=b'i', eof=True), + MatchesStatusCall(call='stopTestRun'), + ]) + ) + + def test_tags_specified_once_only(self): + with temp_file_contents(b"Hi") as f: + self.patch(_o, '_CHUNK_SIZE', 1) + result = get_result_for([ + self.option, + self.test_id, + '--attach-file', + f.name, + '--tag', + 'foo', + '--tag', + 'bar', + ]) + + self.assertThat( + result._events, + MatchesListwise([ + MatchesStatusCall(call='startTestRun'), + MatchesStatusCall(test_id=self.test_id, test_tags=set(['foo', 'bar'])), + MatchesStatusCall(test_id=self.test_id, test_tags=None), + MatchesStatusCall(call='stopTestRun'), + ]) + ) + + def test_timestamp_specified_once_only(self): + with temp_file_contents(b"Hi") as f: + self.patch(_o, '_CHUNK_SIZE', 1) + result = get_result_for([ + self.option, + self.test_id, + '--attach-file', + f.name, + ]) + + self.assertThat( + result._events, + MatchesListwise([ + MatchesStatusCall(call='startTestRun'), + MatchesStatusCall(test_id=self.test_id, timestamp=self._dummy_timestamp), + MatchesStatusCall(test_id=self.test_id, timestamp=None), + MatchesStatusCall(call='stopTestRun'), + ]) + ) + + def test_test_status_specified_once_only(self): + with temp_file_contents(b"Hi") as f: + self.patch(_o, '_CHUNK_SIZE', 1) + result = get_result_for([ + self.option, + self.test_id, + '--attach-file', + f.name, + ]) + + # 'inprogress' status should be on the first packet only, all other + # statuses should be on the last packet. + if self.status in _FINAL_ACTIONS: + first_call = MatchesStatusCall(test_id=self.test_id, test_status=None) + last_call = MatchesStatusCall(test_id=self.test_id, test_status=self.status) + else: + first_call = MatchesStatusCall(test_id=self.test_id, test_status=self.status) + last_call = MatchesStatusCall(test_id=self.test_id, test_status=None) + self.assertThat( + result._events, + MatchesListwise([ + MatchesStatusCall(call='startTestRun'), + first_call, + last_call, + MatchesStatusCall(call='stopTestRun'), + ]) + ) + + def test_filename_can_be_overridden(self): + with temp_file_contents(b"Hello") as f: + specified_file_name = self.getUniqueString() + result = get_result_for([ + self.option, + self.test_id, + '--attach-file', + f.name, + '--file-name', + specified_file_name]) + + self.assertThat( + result._events, + MatchesListwise([ + MatchesStatusCall(call='startTestRun'), + MatchesStatusCall(file_name=specified_file_name, file_bytes=b'Hello'), + MatchesStatusCall(call='stopTestRun'), + ]) + ) + + def test_file_name_is_used_by_default(self): + with temp_file_contents(b"Hello") as f: + result = get_result_for([self.option, self.test_id, '--attach-file', f.name]) + + self.assertThat( + result._events, + MatchesListwise([ + MatchesStatusCall(call='startTestRun'), + MatchesStatusCall(file_name=f.name, file_bytes=b'Hello', eof=True), + MatchesStatusCall(call='stopTestRun'), + ]) + ) + + +class FileDataTests(TestCase): + + def test_can_attach_file_without_test_id(self): + with temp_file_contents(b"Hello") as f: + result = get_result_for(['--attach-file', f.name]) + + self.assertThat( + result._events, + MatchesListwise([ + MatchesStatusCall(call='startTestRun'), + MatchesStatusCall(test_id=None, file_bytes=b'Hello', eof=True), + MatchesStatusCall(call='stopTestRun'), + ]) + ) + + def test_file_name_is_used_by_default(self): + with temp_file_contents(b"Hello") as f: + result = get_result_for(['--attach-file', f.name]) + + self.assertThat( + result._events, + MatchesListwise([ + MatchesStatusCall(call='startTestRun'), + MatchesStatusCall(file_name=f.name, file_bytes=b'Hello', eof=True), + MatchesStatusCall(call='stopTestRun'), + ]) + ) + + def test_filename_can_be_overridden(self): + with temp_file_contents(b"Hello") as f: + specified_file_name = self.getUniqueString() + result = get_result_for([ + '--attach-file', + f.name, + '--file-name', + specified_file_name + ]) + + self.assertThat( + result._events, + MatchesListwise([ + MatchesStatusCall(call='startTestRun'), + MatchesStatusCall(file_name=specified_file_name, file_bytes=b'Hello'), + MatchesStatusCall(call='stopTestRun'), + ]) + ) + + def test_files_have_timestamp(self): + _dummy_timestamp = datetime.datetime(2013, 1, 1, 0, 0, 0, 0, UTC) + self.patch(_o, 'create_timestamp', lambda: _dummy_timestamp) + + with temp_file_contents(b"Hello") as f: + specified_file_name = self.getUniqueString() + result = get_result_for([ + '--attach-file', + f.name, + ]) + + self.assertThat( + result._events, + MatchesListwise([ + MatchesStatusCall(call='startTestRun'), + MatchesStatusCall(file_bytes=b'Hello', timestamp=_dummy_timestamp), + MatchesStatusCall(call='stopTestRun'), + ]) + ) + + def test_can_specify_tags_without_test_status(self): + result = get_result_for([ + '--tag', + 'foo', + ]) + + self.assertThat( + result._events, + MatchesListwise([ + MatchesStatusCall(call='startTestRun'), + MatchesStatusCall(test_tags=set(['foo'])), + MatchesStatusCall(call='stopTestRun'), + ]) + ) + + +class MatchesStatusCall(Matcher): + + _position_lookup = { + 'call': 0, + 'test_id': 1, + 'test_status': 2, + 'test_tags': 3, + 'runnable': 4, + 'file_name': 5, + 'file_bytes': 6, + 'eof': 7, + 'mime_type': 8, + 'route_code': 9, + 'timestamp': 10, + } + + def __init__(self, **kwargs): + unknown_kwargs = list(filter( + lambda k: k not in self._position_lookup, + kwargs + )) + if unknown_kwargs: + raise ValueError("Unknown keywords: %s" % ','.join(unknown_kwargs)) + self._filters = kwargs + + def match(self, call_tuple): + for k, v in self._filters.items(): + try: + pos = self._position_lookup[k] + if call_tuple[pos] != v: + return Mismatch( + "Value for key is %r, not %r" % (call_tuple[pos], v) + ) + except IndexError: + return Mismatch("Key %s is not present." % k) + + def __str__(self): + return "<MatchesStatusCall %r>" % self._filters @@ -10,6 +10,7 @@ else: extra = { 'install_requires': [ 'extras', + 'testscenarios', 'testtools>=0.9.34', ] } @@ -52,14 +53,15 @@ setup( scripts = [ 'filters/subunit-1to2', 'filters/subunit-2to1', - 'filters/subunit2gtk', - 'filters/subunit2junitxml', - 'filters/subunit2pyunit', 'filters/subunit-filter', 'filters/subunit-ls', 'filters/subunit-notify', + 'filters/subunit-output', 'filters/subunit-stats', 'filters/subunit-tags', + 'filters/subunit2gtk', + 'filters/subunit2junitxml', + 'filters/subunit2pyunit', 'filters/tap2subunit', ], **extra |