summaryrefslogtreecommitdiff
path: root/python/subunit/__init__.py
diff options
context:
space:
mode:
authorRobert Collins <robertc@robertcollins.net>2013-03-03 22:28:51 +1300
committerRobert Collins <robertc@robertcollins.net>2013-03-03 22:28:51 +1300
commitb1d5d5881f378273ce0dc8d8301d4ba25bcd4988 (patch)
tree5731c7f2a9f000a5c6aa6784311dd901146dd1a6 /python/subunit/__init__.py
parent1565f398b5520f45be1852c3d8a8ffdcf8dccfe2 (diff)
downloadsubunit-git-b1d5d5881f378273ce0dc8d8301d4ba25bcd4988.tar.gz
Port existing filters to v2.
Diffstat (limited to 'python/subunit/__init__.py')
-rw-r--r--python/subunit/__init__.py98
1 files changed, 53 insertions, 45 deletions
diff --git a/python/subunit/__init__.py b/python/subunit/__init__.py
index ced5b4a..ad749ff 100644
--- a/python/subunit/__init__.py
+++ b/python/subunit/__init__.py
@@ -143,7 +143,7 @@ try:
except ImportError:
raise ImportError ("testtools.testresult.real does not contain "
"_StringException, check your version.")
-from testtools import testresult
+from testtools import testresult, CopyStreamResult
from subunit import chunked, details, iso8601, test_results
from subunit.v2 import ByteStreamToStreamResult, StreamResultToBytes
@@ -993,44 +993,51 @@ def run_isolated(klass, self, result):
return result
-def TAP2SubUnit(tap, subunit):
+def TAP2SubUnit(tap, output_stream):
"""Filter a TAP pipe into a subunit pipe.
- :param tap: A tap pipe/stream/file object.
+ This should be invoked once per TAP script, as TAP scripts get
+ mapped to a single runnable case with multiple components.
+
+ :param tap: A tap pipe/stream/file object - should emit unicode strings.
:param subunit: A pipe/stream/file object to write subunit results to.
:return: The exit code to exit with.
"""
+ output = StreamResultToBytes(output_stream)
+ UTF8_TEXT = 'text/plain; charset=UTF8'
BEFORE_PLAN = 0
AFTER_PLAN = 1
SKIP_STREAM = 2
state = BEFORE_PLAN
plan_start = 1
plan_stop = 0
- def _skipped_test(subunit, plan_start):
- # Some tests were skipped.
- subunit.write('test test %d\n' % plan_start)
- subunit.write('error test %d [\n' % plan_start)
- subunit.write('test missing from TAP output\n')
- subunit.write(']\n')
- return plan_start + 1
# Test data for the next test to emit
test_name = None
log = []
result = None
+ def missing_test(plan_start):
+ output.status(test_id='test %d' % plan_start,
+ test_status='fail', runnable=False,
+ mime_type=UTF8_TEXT, eof=True, file_name="tap meta",
+ file_bytes=b"test missing from TAP output")
def _emit_test():
"write out a test"
if test_name is None:
return
- subunit.write("test %s\n" % test_name)
- if not log:
- subunit.write("%s %s\n" % (result, test_name))
- else:
- subunit.write("%s %s [\n" % (result, test_name))
if log:
- for line in log:
- subunit.write("%s\n" % line)
- subunit.write("]\n")
+ log_bytes = b'\n'.join(log_line.encode('utf8') for log_line in log)
+ mime_type = UTF8_TEXT
+ file_name = 'tap comment'
+ eof = True
+ else:
+ log_bytes = None
+ mime_type = None
+ file_name = None
+ eof = True
del log[:]
+ output.status(test_id=test_name, test_status=result,
+ file_bytes=log_bytes, mime_type=mime_type, eof=eof,
+ file_name=file_name, runnable=False)
for line in tap:
if state == BEFORE_PLAN:
match = re.match("(\d+)\.\.(\d+)\s*(?:\#\s+(.*))?\n", line)
@@ -1041,10 +1048,9 @@ def TAP2SubUnit(tap, subunit):
if plan_start > plan_stop and plan_stop == 0:
# skipped file
state = SKIP_STREAM
- subunit.write("test file skip\n")
- subunit.write("skip file skip [\n")
- subunit.write("%s\n" % comment)
- subunit.write("]\n")
+ output.status(test_id='file skip', test_status='skip',
+ file_bytes=comment.encode('utf8'), eof=True,
+ file_name='tap comment')
continue
# not a plan line, or have seen one before
match = re.match("(ok|not ok)(?:\s+(\d+)?)?(?:\s+([^#]*[^#\s]+)\s*)?(?:\s+#\s+(TODO|SKIP|skip|todo)(?:\s+(.*))?)?\n", line)
@@ -1055,7 +1061,7 @@ def TAP2SubUnit(tap, subunit):
if status == 'ok':
result = 'success'
else:
- result = "failure"
+ result = "fail"
if description is None:
description = ''
else:
@@ -1070,7 +1076,8 @@ def TAP2SubUnit(tap, subunit):
if number is not None:
number = int(number)
while plan_start < number:
- plan_start = _skipped_test(subunit, plan_start)
+ missing_test(plan_start)
+ plan_start += 1
test_name = "test %d%s" % (plan_start, description)
plan_start += 1
continue
@@ -1083,18 +1090,21 @@ def TAP2SubUnit(tap, subunit):
extra = ' %s' % reason
_emit_test()
test_name = "Bail out!%s" % extra
- result = "error"
+ result = "fail"
state = SKIP_STREAM
continue
match = re.match("\#.*\n", line)
if match:
log.append(line[:-1])
continue
- subunit.write(line)
+ # Should look at buffering status and binding this to the prior result.
+ output.status(file_bytes=line.encode('utf8'), file_name='stdout',
+ mime_type=UTF8_TEXT)
_emit_test()
while plan_start <= plan_stop:
# record missed tests
- plan_start = _skipped_test(subunit, plan_start)
+ missing_test(plan_start)
+ plan_start += 1
return 0
@@ -1122,24 +1132,21 @@ def tag_stream(original, filtered, tags):
:return: 0
"""
new_tags, gone_tags = tags_to_new_gone(tags)
- def write_tags(new_tags, gone_tags):
- if new_tags or gone_tags:
- filtered.write("tags: " + ' '.join(new_tags))
- if gone_tags:
- for tag in gone_tags:
- filtered.write("-" + tag)
- filtered.write("\n")
- write_tags(new_tags, gone_tags)
- # TODO: use the protocol parser and thus don't mangle test comments.
- for line in original:
- if line.startswith("tags:"):
- line_tags = line[5:].split()
- line_new, line_gone = tags_to_new_gone(line_tags)
- line_new = line_new - gone_tags
- line_gone = line_gone - new_tags
- write_tags(line_new, line_gone)
- else:
- filtered.write(line)
+ source = ByteStreamToStreamResult(original, non_subunit_name='stdout')
+ class Tagger(CopyStreamResult):
+ def status(self, **kwargs):
+ tags = kwargs.get('test_tags')
+ if not tags:
+ tags = set()
+ tags.update(new_tags)
+ tags.difference_update(gone_tags)
+ if tags:
+ kwargs['test_tags'] = tags
+ else:
+ kwargs['test_tags'] = None
+ super(Tagger, self).status(**kwargs)
+ output = Tagger([StreamResultToBytes(filtered)])
+ source.run(output)
return 0
@@ -1292,6 +1299,7 @@ def make_stream_binary(stream):
_make_binary_on_windows(fileno)
return _unwrap_text(stream)
+
def _make_binary_on_windows(fileno):
"""Win32 mangles \r\n to \n and that breaks streams. See bug lp:505078."""
if sys.platform == "win32":