summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Collins <robertc@robertcollins.net>2014-01-13 13:08:43 +1300
committerRobert Collins <robertc@robertcollins.net>2014-01-13 13:08:43 +1300
commitffcdee8c14c8eec3f2f2c2015a4b3593df1b556a (patch)
treee87190b5d94d5683f7a1ab6bc3ebf0f424912df5
parent3d0dc1aa86187a1bd2df007b6822d98c87000611 (diff)
parent3c660e191002c852bf579bbee92597a274c8ada8 (diff)
downloadsubunit-ffcdee8c14c8eec3f2f2c2015a4b3593df1b556a.tar.gz
* Add ``subunit-output`` tool that can generate a Subunit v2 bytestream from
arguments passed on the command line. (Thomi Richards, #1252084)
-rw-r--r--INSTALL5
-rw-r--r--Makefile.am7
-rw-r--r--NEWS6
-rw-r--r--filters/subunit-output23
-rw-r--r--python/subunit/_output.py203
-rw-r--r--python/subunit/tests/__init__.py6
-rw-r--r--python/subunit/tests/test_output_filter.py596
-rwxr-xr-xsetup.py8
8 files changed, 848 insertions, 6 deletions
diff --git a/INSTALL b/INSTALL
index 51c0e79..29052eb 100644
--- a/INSTALL
+++ b/INSTALL
@@ -14,9 +14,12 @@ Dependencies
* Python for the filters
* 'testtools' (On Debian and Ubuntu systems the 'python-testtools' package,
the testtools package on pypi, or https://launchpad.net/testtools) for
- the extended test API which permits attachments. Version 0.9.30 or newer is
+ the extended test API which permits attachments. Version 0.9.30 or newer is
required. Of particular note, http://testtools.python-hosting.com/ is not
the testtools you want.
+* 'testscenarios' (On Debian and Ubuntu systems the 'python-testscenarios'
+ package, the 'testscenarios' package on pypi, or
+ https://launchpad.net/testscenarios) for running some of the python unit tests.
* A C compiler for the C bindings
* Perl for the Perl tools (including subunit-diff)
* Check to run the subunit test suite.
diff --git a/Makefile.am b/Makefile.am
index ab63892..e8f018e 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -27,9 +27,10 @@ EXTRA_DIST = \
python/subunit/tests/test_chunked.py \
python/subunit/tests/test_details.py \
python/subunit/tests/test_filters.py \
+ python/subunit/tests/test_output_filter.py \
python/subunit/tests/test_progress_model.py \
- python/subunit/tests/test_subunit_filter.py \
python/subunit/tests/test_run.py \
+ python/subunit/tests/test_subunit_filter.py \
python/subunit/tests/test_subunit_stats.py \
python/subunit/tests/test_subunit_tags.py \
python/subunit/tests/test_tap2subunit.py \
@@ -53,6 +54,7 @@ dist_bin_SCRIPTS = \
filters/subunit-filter \
filters/subunit-ls \
filters/subunit-notify \
+ filters/subunit-output \
filters/subunit-stats \
filters/subunit-tags \
filters/subunit2csv \
@@ -78,7 +80,8 @@ pkgpython_PYTHON = \
python/subunit/progress_model.py \
python/subunit/run.py \
python/subunit/v2.py \
- python/subunit/test_results.py
+ python/subunit/test_results.py \
+ python/subunit/_output.py
lib_LTLIBRARIES = libsubunit.la
lib_LTLIBRARIES += libcppunit_subunit.la
diff --git a/NEWS b/NEWS
index 59af931..b9ac16c 100644
--- a/NEWS
+++ b/NEWS
@@ -5,6 +5,12 @@ subunit release notes
NEXT (In development)
---------------------
+IMPROVMENTS
+~~~~~~~~~~~
+
+* Add ``subunit-output`` tool that can generate a Subunit v2 bytestream from
+ arguments passed on the command line. (Thomi Richards, #1252084)
+
0.0.16
------
diff --git a/filters/subunit-output b/filters/subunit-output
new file mode 100644
index 0000000..61e5d11
--- /dev/null
+++ b/filters/subunit-output
@@ -0,0 +1,23 @@
+#!/usr/bin/env python
+# subunit: extensions to python unittest to get test results from subprocesses.
+# Copyright (C) 2013 Subunit Contributors
+#
+# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
+# license at the users choice. A copy of both licenses are available in the
+# project source as Apache-2.0 and BSD. You may not use this file except in
+# compliance with one of these two licences.
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# license you chose for the specific language governing permissions and
+# limitations under that license.
+
+
+"""A command-line tool to generate a subunit result byte-stream."""
+
+from subunit._output import output_main
+
+
+if __name__ == '__main__':
+ exit(output_main())
diff --git a/python/subunit/_output.py b/python/subunit/_output.py
new file mode 100644
index 0000000..aa92646
--- /dev/null
+++ b/python/subunit/_output.py
@@ -0,0 +1,203 @@
+# subunit: extensions to python unittest to get test results from subprocesses.
+# Copyright (C) 2013 Subunit Contributors
+#
+# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
+# license at the users choice. A copy of both licenses are available in the
+# project source as Apache-2.0 and BSD. You may not use this file except in
+# compliance with one of these two licences.
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# license you chose for the specific language governing permissions and
+# limitations under that license.
+#
+
+import datetime
+from functools import partial
+from optparse import (
+ OptionGroup,
+ OptionParser,
+ OptionValueError,
+)
+import sys
+
+from subunit import make_stream_binary
+from subunit.iso8601 import UTC
+from subunit.v2 import StreamResultToBytes
+
+
+_FINAL_ACTIONS = frozenset([
+ 'exists',
+ 'fail',
+ 'skip',
+ 'success',
+ 'uxsuccess',
+ 'xfail',
+])
+_ALL_ACTIONS = _FINAL_ACTIONS.union(['inprogress'])
+_CHUNK_SIZE=3670016 # 3.5 MiB
+
+
+def output_main():
+ args = parse_arguments()
+ output = StreamResultToBytes(sys.stdout)
+ generate_stream_results(args, output)
+ return 0
+
+
+def parse_arguments(args=None, ParserClass=OptionParser):
+ """Parse arguments from the command line.
+
+ If specified, args must be a list of strings, similar to sys.argv[1:].
+
+ ParserClass may be specified to override the class we use to parse the
+ command-line arguments. This is useful for testing.
+ """
+ parser = ParserClass(
+ prog="subunit-output",
+ description="A tool to generate a subunit v2 result byte-stream",
+ usage="subunit-output [-h] [status TEST_ID] [options]",
+ )
+ parser.set_default('tags', None)
+ parser.set_default('test_id', None)
+
+ status_commands = OptionGroup(
+ parser,
+ "Status Commands",
+ "These options report the status of a test. TEST_ID must be a string "
+ "that uniquely identifies the test."
+ )
+ for action_name in _ALL_ACTIONS:
+ status_commands.add_option(
+ "--%s" % action_name,
+ nargs=1,
+ action="callback",
+ callback=set_status_cb,
+ callback_args=(action_name,),
+ dest="action",
+ metavar="TEST_ID",
+ help="Report a test status."
+ )
+ parser.add_option_group(status_commands)
+
+ file_commands = OptionGroup(
+ parser,
+ "File Options",
+ "These options control attaching data to a result stream. They can "
+ "either be specified with a status command, in which case the file "
+ "is attached to the test status, or by themselves, in which case "
+ "the file is attached to the stream (and not associated with any "
+ "test id)."
+ )
+ file_commands.add_option(
+ "--attach-file",
+ help="Attach a file to the result stream for this test. If '-' is "
+ "specified, stdin will be read instead. In this case, the file "
+ "name will be set to 'stdin' (but can still be overridden with "
+ "the --file-name option)."
+ )
+ file_commands.add_option(
+ "--file-name",
+ help="The name to give this file attachment. If not specified, the "
+ "name of the file on disk will be used, or 'stdin' in the case "
+ "where '-' was passed to the '--attach-file' argument. This option"
+ " may only be specified when '--attach-file' is specified.",
+ )
+ file_commands.add_option(
+ "--mimetype",
+ help="The mime type to send with this file. This is only used if the "
+ "--attach-file argument is used. This argument is optional. If it "
+ "is not specified, the file will be sent without a mime type. This "
+ "option may only be specified when '--attach-file' is specified.",
+ default=None
+ )
+ parser.add_option_group(file_commands)
+
+ parser.add_option(
+ "--tag",
+ help="Specifies a tag. May be used multiple times",
+ action="append",
+ dest="tags",
+ default=[]
+ )
+
+ (options, args) = parser.parse_args(args)
+ if options.mimetype and not options.attach_file:
+ parser.error("Cannot specify --mimetype without --attach-file")
+ if options.file_name and not options.attach_file:
+ parser.error("Cannot specify --file-name without --attach-file")
+ if options.attach_file:
+ if options.attach_file == '-':
+ if not options.file_name:
+ options.file_name = 'stdin'
+ options.attach_file = make_stream_binary(sys.stdin)
+ else:
+ try:
+ options.attach_file = open(options.attach_file, 'rb')
+ except IOError as e:
+ parser.error("Cannot open %s (%s)" % (options.attach_file, e.strerror))
+
+ return options
+
+
+def set_status_cb(option, opt_str, value, parser, status_name):
+ if getattr(parser.values, "action", None) is not None:
+ raise OptionValueError("argument %s: Only one status may be specified at once." % opt_str)
+
+ if len(parser.rargs) == 0:
+ raise OptionValueError("argument %s: must specify a single TEST_ID." % opt_str)
+ parser.values.action = status_name
+ parser.values.test_id = parser.rargs.pop(0)
+
+
+def generate_stream_results(args, output_writer):
+ output_writer.startTestRun()
+
+ if args.attach_file:
+ reader = partial(args.attach_file.read, _CHUNK_SIZE)
+ this_file_hunk = reader()
+ next_file_hunk = reader()
+
+ is_first_packet = True
+ is_last_packet = False
+ while not is_last_packet:
+ write_status = output_writer.status
+
+ if is_first_packet:
+ if args.attach_file:
+ if args.mimetype:
+ write_status = partial(write_status, mime_type=args.mimetype)
+ if args.tags:
+ write_status = partial(write_status, test_tags=set(args.tags))
+ write_status = partial(write_status, timestamp=create_timestamp())
+ if args.action not in _FINAL_ACTIONS:
+ write_status = partial(write_status, test_status=args.action)
+ is_first_packet = False
+
+ if args.attach_file:
+ filename = args.file_name or args.attach_file.name
+ write_status = partial(write_status, file_name=filename, file_bytes=this_file_hunk)
+ if next_file_hunk == b'':
+ write_status = partial(write_status, eof=True)
+ is_last_packet = True
+ else:
+ this_file_hunk = next_file_hunk
+ next_file_hunk = reader()
+ else:
+ is_last_packet = True
+
+ if args.test_id:
+ write_status = partial(write_status, test_id=args.test_id)
+
+ if is_last_packet:
+ if args.action in _FINAL_ACTIONS:
+ write_status = partial(write_status, test_status=args.action)
+
+ write_status()
+
+ output_writer.stopTestRun()
+
+
+def create_timestamp():
+ return datetime.datetime.now(UTC)
diff --git a/python/subunit/tests/__init__.py b/python/subunit/tests/__init__.py
index b45d7f9..c1c2c64 100644
--- a/python/subunit/tests/__init__.py
+++ b/python/subunit/tests/__init__.py
@@ -17,6 +17,8 @@
import sys
from unittest import TestLoader
+from testscenarios import generate_scenarios
+
# Before the test module imports to avoid circularity.
# For testing: different pythons have different str() implementations.
@@ -34,6 +36,7 @@ from subunit.tests import (
test_chunked,
test_details,
test_filters,
+ test_output_filter,
test_progress_model,
test_run,
test_subunit_filter,
@@ -60,4 +63,7 @@ def test_suite():
result.addTest(loader.loadTestsFromModule(test_subunit_tags))
result.addTest(loader.loadTestsFromModule(test_subunit_stats))
result.addTest(loader.loadTestsFromModule(test_run))
+ result.addTests(
+ generate_scenarios(loader.loadTestsFromModule(test_output_filter))
+ )
return result
diff --git a/python/subunit/tests/test_output_filter.py b/python/subunit/tests/test_output_filter.py
new file mode 100644
index 0000000..0f61ac5
--- /dev/null
+++ b/python/subunit/tests/test_output_filter.py
@@ -0,0 +1,596 @@
+#
+# subunit: extensions to python unittest to get test results from subprocesses.
+# Copyright (C) 2013 Subunit Contributors
+#
+# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
+# license at the users choice. A copy of both licenses are available in the
+# project source as Apache-2.0 and BSD. You may not use this file except in
+# compliance with one of these two licences.
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# license you chose for the specific language governing permissions and
+# limitations under that license.
+#
+
+import datetime
+from functools import partial
+from io import BytesIO, StringIO, TextIOWrapper
+import optparse
+import sys
+from tempfile import NamedTemporaryFile
+
+from contextlib import contextmanager
+from testtools import TestCase
+from testtools.compat import _u
+from testtools.matchers import (
+ Equals,
+ Matcher,
+ MatchesAny,
+ MatchesListwise,
+ Mismatch,
+ raises,
+)
+from testtools.testresult.doubles import StreamResult
+
+from subunit.iso8601 import UTC
+from subunit.v2 import StreamResultToBytes, ByteStreamToStreamResult
+from subunit._output import (
+ _ALL_ACTIONS,
+ _FINAL_ACTIONS,
+ generate_stream_results,
+ parse_arguments,
+)
+import subunit._output as _o
+
+
+class SafeOptionParser(optparse.OptionParser):
+ """An ArgumentParser class that doesn't call sys.exit."""
+
+ def exit(self, status=0, message=""):
+ raise RuntimeError(message)
+
+ def error(self, message):
+ raise RuntimeError(message)
+
+
+safe_parse_arguments = partial(parse_arguments, ParserClass=SafeOptionParser)
+
+
+class TestStatusArgParserTests(TestCase):
+
+ scenarios = [
+ (cmd, dict(command=cmd, option='--' + cmd)) for cmd in _ALL_ACTIONS
+ ]
+
+ def test_can_parse_all_commands_with_test_id(self):
+ test_id = self.getUniqueString()
+ args = safe_parse_arguments(args=[self.option, test_id])
+
+ self.assertThat(args.action, Equals(self.command))
+ self.assertThat(args.test_id, Equals(test_id))
+
+ def test_all_commands_parse_file_attachment(self):
+ with NamedTemporaryFile() as tmp_file:
+ args = safe_parse_arguments(
+ args=[self.option, 'foo', '--attach-file', tmp_file.name]
+ )
+ self.assertThat(args.attach_file.name, Equals(tmp_file.name))
+
+ def test_all_commands_accept_mimetype_argument(self):
+ with NamedTemporaryFile() as tmp_file:
+ args = safe_parse_arguments(
+ args=[self.option, 'foo', '--attach-file', tmp_file.name, '--mimetype', "text/plain"]
+ )
+ self.assertThat(args.mimetype, Equals("text/plain"))
+
+ def test_all_commands_accept_file_name_argument(self):
+ with NamedTemporaryFile() as tmp_file:
+ args = safe_parse_arguments(
+ args=[self.option, 'foo', '--attach-file', tmp_file.name, '--file-name', "foo"]
+ )
+ self.assertThat(args.file_name, Equals("foo"))
+
+ def test_all_commands_accept_tags_argument(self):
+ args = safe_parse_arguments(
+ args=[self.option, 'foo', '--tag', "foo", "--tag", "bar", "--tag", "baz"]
+ )
+ self.assertThat(args.tags, Equals(["foo", "bar", "baz"]))
+
+ def test_attach_file_with_hyphen_opens_stdin(self):
+ self.patch(_o.sys, 'stdin', TextIOWrapper(BytesIO(b"Hello")))
+ args = safe_parse_arguments(
+ args=[self.option, "foo", "--attach-file", "-"]
+ )
+
+ self.assertThat(args.attach_file.read(), Equals(b"Hello"))
+
+ def test_attach_file_with_hyphen_sets_filename_to_stdin(self):
+ args = safe_parse_arguments(
+ args=[self.option, "foo", "--attach-file", "-"]
+ )
+
+ self.assertThat(args.file_name, Equals("stdin"))
+
+ def test_can_override_stdin_filename(self):
+ args = safe_parse_arguments(
+ args=[self.option, "foo", "--attach-file", "-", '--file-name', 'foo']
+ )
+
+ self.assertThat(args.file_name, Equals("foo"))
+
+ def test_requires_test_id(self):
+ fn = lambda: safe_parse_arguments(args=[self.option])
+ self.assertThat(
+ fn,
+ raises(RuntimeError('argument %s: must specify a single TEST_ID.' % self.option))
+ )
+
+
+class ArgParserTests(TestCase):
+
+ def test_can_parse_attach_file_without_test_id(self):
+ with NamedTemporaryFile() as tmp_file:
+ args = safe_parse_arguments(
+ args=["--attach-file", tmp_file.name]
+ )
+ self.assertThat(args.attach_file.name, Equals(tmp_file.name))
+
+ def test_can_run_without_args(self):
+ args = safe_parse_arguments([])
+
+ def test_cannot_specify_more_than_one_status_command(self):
+ fn = lambda: safe_parse_arguments(['--fail', 'foo', '--skip', 'bar'])
+ self.assertThat(
+ fn,
+ raises(RuntimeError('argument --skip: Only one status may be specified at once.'))
+ )
+
+ def test_cannot_specify_mimetype_without_attach_file(self):
+ fn = lambda: safe_parse_arguments(['--mimetype', 'foo'])
+ self.assertThat(
+ fn,
+ raises(RuntimeError('Cannot specify --mimetype without --attach-file'))
+ )
+
+ def test_cannot_specify_filename_without_attach_file(self):
+ fn = lambda: safe_parse_arguments(['--file-name', 'foo'])
+ self.assertThat(
+ fn,
+ raises(RuntimeError('Cannot specify --file-name without --attach-file'))
+ )
+
+ def test_can_specify_tags_without_status_command(self):
+ args = safe_parse_arguments(['--tag', 'foo'])
+ self.assertEqual(['foo'], args.tags)
+
+ def test_must_specify_tags_with_tags_options(self):
+ fn = lambda: safe_parse_arguments(['--fail', 'foo', '--tag'])
+ self.assertThat(
+ fn,
+ MatchesAny(
+ raises(RuntimeError('--tag option requires 1 argument')),
+ raises(RuntimeError('--tag option requires an argument')),
+ )
+ )
+
+def get_result_for(commands):
+ """Get a result object from *commands.
+
+ Runs the 'generate_stream_results' function from subunit._output after
+ parsing *commands as if they were specified on the command line. The
+ resulting bytestream is then converted back into a result object and
+ returned.
+ """
+ result = StreamResult()
+ args = safe_parse_arguments(commands)
+ generate_stream_results(args, result)
+ return result
+
+
+@contextmanager
+def temp_file_contents(data):
+ """Create a temporary file on disk containing 'data'."""
+ with NamedTemporaryFile() as f:
+ f.write(data)
+ f.seek(0)
+ yield f
+
+
+class StatusStreamResultTests(TestCase):
+
+ scenarios = [
+ (s, dict(status=s, option='--' + s)) for s in _ALL_ACTIONS
+ ]
+
+ _dummy_timestamp = datetime.datetime(2013, 1, 1, 0, 0, 0, 0, UTC)
+
+ def setUp(self):
+ super(StatusStreamResultTests, self).setUp()
+ self.patch(_o, 'create_timestamp', lambda: self._dummy_timestamp)
+ self.test_id = self.getUniqueString()
+
+ def test_only_one_packet_is_generated(self):
+ result = get_result_for([self.option, self.test_id])
+ self.assertThat(
+ len(result._events),
+ Equals(3) # startTestRun and stopTestRun are also called, making 3 total.
+ )
+
+ def test_correct_status_is_generated(self):
+ result = get_result_for([self.option, self.test_id])
+
+ self.assertThat(
+ result._events[1],
+ MatchesStatusCall(test_status=self.status)
+ )
+
+ def test_all_commands_generate_tags(self):
+ result = get_result_for([self.option, self.test_id, '--tag', 'hello', '--tag', 'world'])
+ self.assertThat(
+ result._events[1],
+ MatchesStatusCall(test_tags=set(['hello', 'world']))
+ )
+
+ def test_all_commands_generate_timestamp(self):
+ result = get_result_for([self.option, self.test_id])
+
+ self.assertThat(
+ result._events[1],
+ MatchesStatusCall(timestamp=self._dummy_timestamp)
+ )
+
+ def test_all_commands_generate_correct_test_id(self):
+ result = get_result_for([self.option, self.test_id])
+
+ self.assertThat(
+ result._events[1],
+ MatchesStatusCall(test_id=self.test_id)
+ )
+
+ def test_file_is_sent_in_single_packet(self):
+ with temp_file_contents(b"Hello") as f:
+ result = get_result_for([self.option, self.test_id, '--attach-file', f.name])
+
+ self.assertThat(
+ result._events,
+ MatchesListwise([
+ MatchesStatusCall(call='startTestRun'),
+ MatchesStatusCall(file_bytes=b'Hello', eof=True),
+ MatchesStatusCall(call='stopTestRun'),
+ ])
+ )
+
+ def test_can_read_binary_files(self):
+ with temp_file_contents(b"\xDE\xAD\xBE\xEF") as f:
+ result = get_result_for([self.option, self.test_id, '--attach-file', f.name])
+
+ self.assertThat(
+ result._events,
+ MatchesListwise([
+ MatchesStatusCall(call='startTestRun'),
+ MatchesStatusCall(file_bytes=b"\xDE\xAD\xBE\xEF", eof=True),
+ MatchesStatusCall(call='stopTestRun'),
+ ])
+ )
+
+ def test_can_read_empty_files(self):
+ with temp_file_contents(b"") as f:
+ result = get_result_for([self.option, self.test_id, '--attach-file', f.name])
+
+ self.assertThat(
+ result._events,
+ MatchesListwise([
+ MatchesStatusCall(call='startTestRun'),
+ MatchesStatusCall(file_bytes=b"", file_name=f.name, eof=True),
+ MatchesStatusCall(call='stopTestRun'),
+ ])
+ )
+
+ def test_can_read_stdin(self):
+ self.patch(_o.sys, 'stdin', TextIOWrapper(BytesIO(b"\xFE\xED\xFA\xCE")))
+ result = get_result_for([self.option, self.test_id, '--attach-file', '-'])
+
+ self.assertThat(
+ result._events,
+ MatchesListwise([
+ MatchesStatusCall(call='startTestRun'),
+ MatchesStatusCall(file_bytes=b"\xFE\xED\xFA\xCE", file_name='stdin', eof=True),
+ MatchesStatusCall(call='stopTestRun'),
+ ])
+ )
+
+ def test_file_is_sent_with_test_id(self):
+ with temp_file_contents(b"Hello") as f:
+ result = get_result_for([self.option, self.test_id, '--attach-file', f.name])
+
+ self.assertThat(
+ result._events,
+ MatchesListwise([
+ MatchesStatusCall(call='startTestRun'),
+ MatchesStatusCall(test_id=self.test_id, file_bytes=b'Hello', eof=True),
+ MatchesStatusCall(call='stopTestRun'),
+ ])
+ )
+
+ def test_file_is_sent_with_test_status(self):
+ with temp_file_contents(b"Hello") as f:
+ result = get_result_for([self.option, self.test_id, '--attach-file', f.name])
+
+ self.assertThat(
+ result._events,
+ MatchesListwise([
+ MatchesStatusCall(call='startTestRun'),
+ MatchesStatusCall(test_status=self.status, file_bytes=b'Hello', eof=True),
+ MatchesStatusCall(call='stopTestRun'),
+ ])
+ )
+
+ def test_file_chunk_size_is_honored(self):
+ with temp_file_contents(b"Hello") as f:
+ self.patch(_o, '_CHUNK_SIZE', 1)
+ result = get_result_for([self.option, self.test_id, '--attach-file', f.name])
+
+ self.assertThat(
+ result._events,
+ MatchesListwise([
+ MatchesStatusCall(call='startTestRun'),
+ MatchesStatusCall(test_id=self.test_id, file_bytes=b'H', eof=False),
+ MatchesStatusCall(test_id=self.test_id, file_bytes=b'e', eof=False),
+ MatchesStatusCall(test_id=self.test_id, file_bytes=b'l', eof=False),
+ MatchesStatusCall(test_id=self.test_id, file_bytes=b'l', eof=False),
+ MatchesStatusCall(test_id=self.test_id, file_bytes=b'o', eof=True),
+ MatchesStatusCall(call='stopTestRun'),
+ ])
+ )
+
+ def test_file_mimetype_specified_once_only(self):
+ with temp_file_contents(b"Hi") as f:
+ self.patch(_o, '_CHUNK_SIZE', 1)
+ result = get_result_for([
+ self.option,
+ self.test_id,
+ '--attach-file',
+ f.name,
+ '--mimetype',
+ 'text/plain',
+ ])
+
+ self.assertThat(
+ result._events,
+ MatchesListwise([
+ MatchesStatusCall(call='startTestRun'),
+ MatchesStatusCall(test_id=self.test_id, mime_type='text/plain', file_bytes=b'H', eof=False),
+ MatchesStatusCall(test_id=self.test_id, mime_type=None, file_bytes=b'i', eof=True),
+ MatchesStatusCall(call='stopTestRun'),
+ ])
+ )
+
+ def test_tags_specified_once_only(self):
+ with temp_file_contents(b"Hi") as f:
+ self.patch(_o, '_CHUNK_SIZE', 1)
+ result = get_result_for([
+ self.option,
+ self.test_id,
+ '--attach-file',
+ f.name,
+ '--tag',
+ 'foo',
+ '--tag',
+ 'bar',
+ ])
+
+ self.assertThat(
+ result._events,
+ MatchesListwise([
+ MatchesStatusCall(call='startTestRun'),
+ MatchesStatusCall(test_id=self.test_id, test_tags=set(['foo', 'bar'])),
+ MatchesStatusCall(test_id=self.test_id, test_tags=None),
+ MatchesStatusCall(call='stopTestRun'),
+ ])
+ )
+
+ def test_timestamp_specified_once_only(self):
+ with temp_file_contents(b"Hi") as f:
+ self.patch(_o, '_CHUNK_SIZE', 1)
+ result = get_result_for([
+ self.option,
+ self.test_id,
+ '--attach-file',
+ f.name,
+ ])
+
+ self.assertThat(
+ result._events,
+ MatchesListwise([
+ MatchesStatusCall(call='startTestRun'),
+ MatchesStatusCall(test_id=self.test_id, timestamp=self._dummy_timestamp),
+ MatchesStatusCall(test_id=self.test_id, timestamp=None),
+ MatchesStatusCall(call='stopTestRun'),
+ ])
+ )
+
+ def test_test_status_specified_once_only(self):
+ with temp_file_contents(b"Hi") as f:
+ self.patch(_o, '_CHUNK_SIZE', 1)
+ result = get_result_for([
+ self.option,
+ self.test_id,
+ '--attach-file',
+ f.name,
+ ])
+
+ # 'inprogress' status should be on the first packet only, all other
+ # statuses should be on the last packet.
+ if self.status in _FINAL_ACTIONS:
+ first_call = MatchesStatusCall(test_id=self.test_id, test_status=None)
+ last_call = MatchesStatusCall(test_id=self.test_id, test_status=self.status)
+ else:
+ first_call = MatchesStatusCall(test_id=self.test_id, test_status=self.status)
+ last_call = MatchesStatusCall(test_id=self.test_id, test_status=None)
+ self.assertThat(
+ result._events,
+ MatchesListwise([
+ MatchesStatusCall(call='startTestRun'),
+ first_call,
+ last_call,
+ MatchesStatusCall(call='stopTestRun'),
+ ])
+ )
+
+ def test_filename_can_be_overridden(self):
+ with temp_file_contents(b"Hello") as f:
+ specified_file_name = self.getUniqueString()
+ result = get_result_for([
+ self.option,
+ self.test_id,
+ '--attach-file',
+ f.name,
+ '--file-name',
+ specified_file_name])
+
+ self.assertThat(
+ result._events,
+ MatchesListwise([
+ MatchesStatusCall(call='startTestRun'),
+ MatchesStatusCall(file_name=specified_file_name, file_bytes=b'Hello'),
+ MatchesStatusCall(call='stopTestRun'),
+ ])
+ )
+
+ def test_file_name_is_used_by_default(self):
+ with temp_file_contents(b"Hello") as f:
+ result = get_result_for([self.option, self.test_id, '--attach-file', f.name])
+
+ self.assertThat(
+ result._events,
+ MatchesListwise([
+ MatchesStatusCall(call='startTestRun'),
+ MatchesStatusCall(file_name=f.name, file_bytes=b'Hello', eof=True),
+ MatchesStatusCall(call='stopTestRun'),
+ ])
+ )
+
+
+class FileDataTests(TestCase):
+
+ def test_can_attach_file_without_test_id(self):
+ with temp_file_contents(b"Hello") as f:
+ result = get_result_for(['--attach-file', f.name])
+
+ self.assertThat(
+ result._events,
+ MatchesListwise([
+ MatchesStatusCall(call='startTestRun'),
+ MatchesStatusCall(test_id=None, file_bytes=b'Hello', eof=True),
+ MatchesStatusCall(call='stopTestRun'),
+ ])
+ )
+
+ def test_file_name_is_used_by_default(self):
+ with temp_file_contents(b"Hello") as f:
+ result = get_result_for(['--attach-file', f.name])
+
+ self.assertThat(
+ result._events,
+ MatchesListwise([
+ MatchesStatusCall(call='startTestRun'),
+ MatchesStatusCall(file_name=f.name, file_bytes=b'Hello', eof=True),
+ MatchesStatusCall(call='stopTestRun'),
+ ])
+ )
+
+ def test_filename_can_be_overridden(self):
+ with temp_file_contents(b"Hello") as f:
+ specified_file_name = self.getUniqueString()
+ result = get_result_for([
+ '--attach-file',
+ f.name,
+ '--file-name',
+ specified_file_name
+ ])
+
+ self.assertThat(
+ result._events,
+ MatchesListwise([
+ MatchesStatusCall(call='startTestRun'),
+ MatchesStatusCall(file_name=specified_file_name, file_bytes=b'Hello'),
+ MatchesStatusCall(call='stopTestRun'),
+ ])
+ )
+
+ def test_files_have_timestamp(self):
+ _dummy_timestamp = datetime.datetime(2013, 1, 1, 0, 0, 0, 0, UTC)
+ self.patch(_o, 'create_timestamp', lambda: _dummy_timestamp)
+
+ with temp_file_contents(b"Hello") as f:
+ specified_file_name = self.getUniqueString()
+ result = get_result_for([
+ '--attach-file',
+ f.name,
+ ])
+
+ self.assertThat(
+ result._events,
+ MatchesListwise([
+ MatchesStatusCall(call='startTestRun'),
+ MatchesStatusCall(file_bytes=b'Hello', timestamp=_dummy_timestamp),
+ MatchesStatusCall(call='stopTestRun'),
+ ])
+ )
+
+ def test_can_specify_tags_without_test_status(self):
+ result = get_result_for([
+ '--tag',
+ 'foo',
+ ])
+
+ self.assertThat(
+ result._events,
+ MatchesListwise([
+ MatchesStatusCall(call='startTestRun'),
+ MatchesStatusCall(test_tags=set(['foo'])),
+ MatchesStatusCall(call='stopTestRun'),
+ ])
+ )
+
+
+class MatchesStatusCall(Matcher):
+
+ _position_lookup = {
+ 'call': 0,
+ 'test_id': 1,
+ 'test_status': 2,
+ 'test_tags': 3,
+ 'runnable': 4,
+ 'file_name': 5,
+ 'file_bytes': 6,
+ 'eof': 7,
+ 'mime_type': 8,
+ 'route_code': 9,
+ 'timestamp': 10,
+ }
+
+ def __init__(self, **kwargs):
+ unknown_kwargs = list(filter(
+ lambda k: k not in self._position_lookup,
+ kwargs
+ ))
+ if unknown_kwargs:
+ raise ValueError("Unknown keywords: %s" % ','.join(unknown_kwargs))
+ self._filters = kwargs
+
+ def match(self, call_tuple):
+ for k, v in self._filters.items():
+ try:
+ pos = self._position_lookup[k]
+ if call_tuple[pos] != v:
+ return Mismatch(
+ "Value for key is %r, not %r" % (call_tuple[pos], v)
+ )
+ except IndexError:
+ return Mismatch("Key %s is not present." % k)
+
+ def __str__(self):
+ return "<MatchesStatusCall %r>" % self._filters
diff --git a/setup.py b/setup.py
index 9917977..5a05002 100755
--- a/setup.py
+++ b/setup.py
@@ -10,6 +10,7 @@ else:
extra = {
'install_requires': [
'extras',
+ 'testscenarios',
'testtools>=0.9.34',
]
}
@@ -52,14 +53,15 @@ setup(
scripts = [
'filters/subunit-1to2',
'filters/subunit-2to1',
- 'filters/subunit2gtk',
- 'filters/subunit2junitxml',
- 'filters/subunit2pyunit',
'filters/subunit-filter',
'filters/subunit-ls',
'filters/subunit-notify',
+ 'filters/subunit-output',
'filters/subunit-stats',
'filters/subunit-tags',
+ 'filters/subunit2gtk',
+ 'filters/subunit2junitxml',
+ 'filters/subunit2pyunit',
'filters/tap2subunit',
],
**extra