diff options
| author | Robert Collins <robertc@robertcollins.net> | 2011-04-27 16:58:50 +1200 |
|---|---|---|
| committer | Robert Collins <robertc@robertcollins.net> | 2011-04-27 16:58:50 +1200 |
| commit | 22df1b70c04ec46167e17945bab835cf5a4d95f9 (patch) | |
| tree | 9f379fc152b76ddabd7523457bc5085453de5735 /python/subunit/__init__.py | |
| parent | fac837618f84531205f6491fb9d72bde8717fbb8 (diff) | |
| parent | 93731a8e206c5a9bc73b682f9ac2716ae5ad3977 (diff) | |
| download | subunit-git-22df1b70c04ec46167e17945bab835cf5a4d95f9.tar.gz | |
Merge python 3K changes.
Diffstat (limited to 'python/subunit/__init__.py')
| -rw-r--r-- | python/subunit/__init__.py | 192 |
1 files changed, 121 insertions, 71 deletions
diff --git a/python/subunit/__init__.py b/python/subunit/__init__.py index 368d3b2..e92eb26 100644 --- a/python/subunit/__init__.py +++ b/python/subunit/__init__.py @@ -118,23 +118,28 @@ Utility modules import os import re -from StringIO import StringIO import subprocess import sys import unittest -import iso8601 from testtools import content, content_type, ExtendedToOriginalDecorator +from testtools.compat import _b, _u, BytesIO, StringIO try: from testtools.testresult.real import _StringException RemoteException = _StringException - _remote_exception_str = '_StringException' # For testing. + # For testing: different pythons have different str() implementations. + if sys.version_info > (3, 0): + _remote_exception_str = "testtools.testresult.real._StringException" + _remote_exception_str_chunked = "34\r\n" + _remote_exception_str + else: + _remote_exception_str = "_StringException" + _remote_exception_str_chunked = "1A\r\n" + _remote_exception_str except ImportError: raise ImportError ("testtools.testresult.real does not contain " "_StringException, check your version.") from testtools import testresult -import chunked, details, test_results +from subunit import chunked, details, iso8601, test_results PROGRESS_SET = 0 @@ -186,6 +191,18 @@ class _ParserState(object): def __init__(self, parser): self.parser = parser + self._test_sym = (_b('test'), _b('testing')) + self._colon_sym = _b(':') + self._error_sym = (_b('error'),) + self._failure_sym = (_b('failure'),) + self._progress_sym = (_b('progress'),) + self._skip_sym = _b('skip') + self._success_sym = (_b('success'), _b('successful')) + self._tags_sym = (_b('tags'),) + self._time_sym = (_b('time'),) + self._xfail_sym = (_b('xfail'),) + self._start_simple = _u(" [") + self._start_multipart = _u(" [ multipart") def addError(self, offset, line): """An 'error:' directive has been read.""" @@ -213,26 +230,26 @@ class _ParserState(object): if len(parts) == 2 and line.startswith(parts[0]): cmd, rest = parts offset = len(cmd) + 1 - cmd = cmd.rstrip(':') - if cmd in ('test', 'testing'): + cmd = cmd.rstrip(self._colon_sym) + if cmd in self._test_sym: self.startTest(offset, line) - elif cmd == 'error': + elif cmd in self._error_sym: self.addError(offset, line) - elif cmd == 'failure': + elif cmd in self._failure_sym: self.addFailure(offset, line) - elif cmd == 'progress': + elif cmd in self._progress_sym: self.parser._handleProgress(offset, line) - elif cmd == 'skip': + elif cmd in self._skip_sym: self.addSkip(offset, line) - elif cmd in ('success', 'successful'): + elif cmd in self._success_sym: self.addSuccess(offset, line) - elif cmd in ('tags',): + elif cmd in self._tags_sym: self.parser._handleTags(offset, line) self.parser.subunitLineReceived(line) - elif cmd in ('time',): + elif cmd in self._time_sym: self.parser._handleTime(offset, line) self.parser.subunitLineReceived(line) - elif cmd == 'xfail': + elif cmd in self._xfail_sym: self.addExpectedFail(offset, line) else: self.parser.stdOutLineReceived(line) @@ -241,7 +258,7 @@ class _ParserState(object): def lostConnection(self): """Connection lost.""" - self.parser._lostConnectionInTest(u'unknown state of ') + self.parser._lostConnectionInTest(_u('unknown state of ')) def startTest(self, offset, line): """A test start command received.""" @@ -258,19 +275,21 @@ class _InTest(_ParserState): :param details_state: The state to switch to for details processing of this outcome. """ - if self.parser.current_test_description == line[offset:-1]: + test_name = line[offset:-1].decode('utf8') + if self.parser.current_test_description == test_name: self.parser._state = self.parser._outside_test self.parser.current_test_description = None no_details() self.parser.client.stopTest(self.parser._current_test) self.parser._current_test = None self.parser.subunitLineReceived(line) - elif self.parser.current_test_description + " [" == line[offset:-1]: + elif self.parser.current_test_description + self._start_simple == \ + test_name: self.parser._state = details_state details_state.set_simple() self.parser.subunitLineReceived(line) - elif self.parser.current_test_description + " [ multipart" == \ - line[offset:-1]: + elif self.parser.current_test_description + self._start_multipart == \ + test_name: self.parser._state = details_state details_state.set_multipart() self.parser.subunitLineReceived(line) @@ -321,7 +340,7 @@ class _InTest(_ParserState): def lostConnection(self): """Connection lost.""" - self.parser._lostConnectionInTest(u'') + self.parser._lostConnectionInTest(_u('')) class _OutSideTest(_ParserState): @@ -333,8 +352,9 @@ class _OutSideTest(_ParserState): def startTest(self, offset, line): """A test start command received.""" self.parser._state = self.parser._in_test - self.parser._current_test = RemotedTestCase(line[offset:-1]) - self.parser.current_test_description = line[offset:-1] + test_name = line[offset:-1].decode('utf8') + self.parser._current_test = RemotedTestCase(test_name) + self.parser.current_test_description = test_name self.parser.client.startTest(self.parser._current_test) self.parser.subunitLineReceived(line) @@ -356,7 +376,7 @@ class _ReadingDetails(_ParserState): def lostConnection(self): """Connection lost.""" - self.parser._lostConnectionInTest(u'%s report of ' % + self.parser._lostConnectionInTest(_u('%s report of ') % self._outcome_label()) def _outcome_label(self): @@ -440,7 +460,7 @@ class TestProtocolServer(object): :param stream: The stream that lines received which are not part of the subunit protocol should be written to. This allows custom handling of mixed protocols. By default, sys.stdout will be used for - convenience. + convenience. It should accept bytes to its write() method. :param forward_stream: A stream to forward subunit lines to. This allows a filter to forward the entire stream while still parsing and acting on it. By default forward_stream is set to @@ -449,6 +469,8 @@ class TestProtocolServer(object): self.client = ExtendedToOriginalDecorator(client) if stream is None: stream = sys.stdout + if sys.version_info > (3, 0): + stream = stream.buffer self._stream = stream self._forward_stream = forward_stream or DiscardStream() # state objects we can switch too @@ -461,17 +483,21 @@ class TestProtocolServer(object): self._reading_xfail_details = _ReadingExpectedFailureDetails(self) # start with outside test. self._state = self._outside_test + # Avoid casts on every call + self._plusminus = _b('+-') + self._push_sym = _b('push') + self._pop_sym = _b('pop') def _handleProgress(self, offset, line): """Process a progress directive.""" line = line[offset:].strip() - if line[0] in '+-': + if line[0] in self._plusminus: whence = PROGRESS_CUR delta = int(line) - elif line == "push": + elif line == self._push_sym: whence = PROGRESS_PUSH delta = None - elif line == "pop": + elif line == self._pop_sym: whence = PROGRESS_POP delta = None else: @@ -481,7 +507,7 @@ class TestProtocolServer(object): def _handleTags(self, offset, line): """Process a tags command.""" - tags = line[offset:].split() + tags = line[offset:].decode('utf8').split() new_tags, gone_tags = tags_to_new_gone(tags) self.client.tags(new_tags, gone_tags) @@ -489,8 +515,9 @@ class TestProtocolServer(object): # Accept it, but do not do anything with it yet. try: event_time = iso8601.parse_date(line[offset:-1]) - except TypeError, e: - raise TypeError("Failed to parse %r, got %r" % (line, e)) + except TypeError: + raise TypeError(_u("Failed to parse %r, got %r") + % (line, sys.exec_info[1])) self.client.time(event_time) def lineReceived(self, line): @@ -498,7 +525,7 @@ class TestProtocolServer(object): self._state.lineReceived(line) def _lostConnectionInTest(self, state_string): - error_string = u"lost connection during %stest '%s'" % ( + error_string = _u("lost connection during %stest '%s'") % ( state_string, self.current_test_description) self.client.addError(self._current_test, RemoteError(error_string)) self.client.stopTest(self._current_test) @@ -533,7 +560,8 @@ class TestProtocolClient(testresult.TestResult): # Get a TestSuite or TestCase to run suite = make_suite() - # Create a stream (any object with a 'write' method) + # Create a stream (any object with a 'write' method). This should accept + # bytes not strings: subunit is a byte orientated protocol. stream = file('tests.log', 'wb') # Create a subunit result object which will output to the stream result = subunit.TestProtocolClient(stream) @@ -550,6 +578,14 @@ class TestProtocolClient(testresult.TestResult): testresult.TestResult.__init__(self) self._stream = stream _make_stream_binary(stream) + self._progress_fmt = _b("progress: ") + self._bytes_eol = _b("\n") + self._progress_plus = _b("+") + self._progress_push = _b("push") + self._progress_pop = _b("pop") + self._empty_bytes = _b("") + self._start_simple = _b(" [\n") + self._end_simple = _b("]\n") def addError(self, test, error=None, details=None): """Report an error in test test. @@ -611,42 +647,42 @@ class TestProtocolClient(testresult.TestResult): :param details: New Testing-in-python drafted API; a dict from string to subunit.Content objects. """ - self._stream.write("%s: %s" % (outcome, test.id())) + self._stream.write(_b("%s: %s" % (outcome, test.id()))) if error is None and details is None: raise ValueError if error is not None: - self._stream.write(" [\n") + self._stream.write(self._start_simple) # XXX: this needs to be made much stricter, along the lines of # Martin[gz]'s work in testtools. Perhaps subunit can use that? for line in self._exc_info_to_unicode(error, test).splitlines(): self._stream.write(("%s\n" % line).encode('utf8')) else: self._write_details(details) - self._stream.write("]\n") + self._stream.write(self._end_simple) def addSkip(self, test, reason=None, details=None): """Report a skipped test.""" if reason is None: self._addOutcome("skip", test, error=None, details=details) else: - self._stream.write("skip: %s [\n" % test.id()) - self._stream.write("%s\n" % reason) - self._stream.write("]\n") + self._stream.write(_b("skip: %s [\n" % test.id())) + self._stream.write(_b("%s\n" % reason)) + self._stream.write(self._end_simple) def addSuccess(self, test, details=None): """Report a success in a test.""" - self._stream.write("successful: %s" % test.id()) + self._stream.write(_b("successful: %s" % test.id())) if not details: - self._stream.write("\n") + self._stream.write(_b("\n")) else: self._write_details(details) - self._stream.write("]\n") + self._stream.write(self._end_simple) addUnexpectedSuccess = addSuccess def startTest(self, test): """Mark a test as starting its test run.""" super(TestProtocolClient, self).startTest(test) - self._stream.write("test: %s\n" % test.id()) + self._stream.write(_b("test: %s\n" % test.id())) self._stream.flush() def stopTest(self, test): @@ -664,16 +700,19 @@ class TestProtocolClient(testresult.TestResult): PROGRESS_POP. """ if whence == PROGRESS_CUR and offset > -1: - prefix = "+" + prefix = self._progress_plus + offset = _b(str(offset)) elif whence == PROGRESS_PUSH: - prefix = "" - offset = "push" + prefix = self._empty_bytes + offset = self._progress_push elif whence == PROGRESS_POP: - prefix = "" - offset = "pop" + prefix = self._empty_bytes + offset = self._progress_pop else: - prefix = "" - self._stream.write("progress: %s%s\n" % (prefix, offset)) + prefix = self._empty_bytes + offset = _b(str(offset)) + self._stream.write(self._progress_fmt + prefix + offset + + self._bytes_eol) def time(self, a_datetime): """Inform the client of the time. @@ -681,36 +720,36 @@ class TestProtocolClient(testresult.TestResult): ":param datetime: A datetime.datetime object. """ time = a_datetime.astimezone(iso8601.Utc()) - self._stream.write("time: %04d-%02d-%02d %02d:%02d:%02d.%06dZ\n" % ( + self._stream.write(_b("time: %04d-%02d-%02d %02d:%02d:%02d.%06dZ\n" % ( time.year, time.month, time.day, time.hour, time.minute, - time.second, time.microsecond)) + time.second, time.microsecond))) def _write_details(self, details): """Output details to the stream. :param details: An extended details dict for a test outcome. """ - self._stream.write(" [ multipart\n") - for name, content in sorted(details.iteritems()): - self._stream.write("Content-Type: %s/%s" % - (content.content_type.type, content.content_type.subtype)) + self._stream.write(_b(" [ multipart\n")) + for name, content in sorted(details.items()): + self._stream.write(_b("Content-Type: %s/%s" % + (content.content_type.type, content.content_type.subtype))) parameters = content.content_type.parameters if parameters: - self._stream.write(";") + self._stream.write(_b(";")) param_strs = [] - for param, value in parameters.iteritems(): + for param, value in parameters.items(): param_strs.append("%s=%s" % (param, value)) - self._stream.write(",".join(param_strs)) - self._stream.write("\n%s\n" % name) + self._stream.write(_b(",".join(param_strs))) + self._stream.write(_b("\n%s\n" % name)) encoder = chunked.Encoder(self._stream) - map(encoder.write, content.iter_bytes()) + list(map(encoder.write, content.iter_bytes())) encoder.close() def done(self): """Obey the testtools result.done() interface.""" -def RemoteError(description=u""): +def RemoteError(description=_u("")): return (_StringException, _StringException(description), None) @@ -760,7 +799,7 @@ class RemotedTestCase(unittest.TestCase): def run(self, result=None): if result is None: result = self.defaultTestResult() result.startTest(self) - result.addError(self, RemoteError(u"Cannot run RemotedTestCases.\n")) + result.addError(self, RemoteError(_u("Cannot run RemotedTestCases.\n"))) result.stopTest(self) def _strclass(self): @@ -796,7 +835,7 @@ class ExecTestCase(unittest.TestCase): protocol = TestProtocolServer(result) output = subprocess.Popen(self.script, shell=True, stdout=subprocess.PIPE).communicate()[0] - protocol.readFrom(StringIO(output)) + protocol.readFrom(BytesIO(output)) class IsolatedTestCase(unittest.TestCase): @@ -845,10 +884,10 @@ def run_isolated(klass, self, result): # at this point, sys.stdin is redirected, now we want # to filter it to escape ]'s. ### XXX: test and write that bit. - - result = TestProtocolClient(sys.stdout) + stream = os.fdopen(1, 'wb') + result = TestProtocolClient(stream) klass.run(self, result) - sys.stdout.flush() + stream.flush() sys.stderr.flush() # exit HARD, exit NOW. os._exit(0) @@ -858,7 +897,8 @@ def run_isolated(klass, self, result): os.close(c2pwrite) # hookup a protocol engine protocol = TestProtocolServer(result) - protocol.readFrom(os.fdopen(c2pread, 'rU')) + fileobj = os.fdopen(c2pread, 'rb') + protocol.readFrom(fileobj) os.waitpid(pid, 0) # TODO return code evaluation. return result @@ -1053,7 +1093,6 @@ class ProtocolTestCase(object): _make_stream_binary(stream) self._passthrough = passthrough self._forward = forward - _make_stream_binary(forward) def __call__(self, result=None): return self.run(result) @@ -1129,9 +1168,17 @@ def get_default_formatter(): if formatter: return os.popen(formatter, "w") else: - return sys.stdout + stream = sys.stdout + if sys.version_info > (3, 0): + stream = stream.buffer + return stream +if sys.version_info > (3, 0): + from io import UnsupportedOperation as _NoFilenoError +else: + _NoFilenoError = AttributeError + def read_test_list(path): """Read a list of test ids from a file on disk. @@ -1147,8 +1194,11 @@ def read_test_list(path): def _make_stream_binary(stream): """Ensure that a stream will be binary safe. See _make_binary_on_windows.""" - if getattr(stream, 'fileno', None) is not None: - _make_binary_on_windows(stream.fileno()) + try: + fileno = stream.fileno() + except _NoFilenoError: + return + _make_binary_on_windows(fileno) def _make_binary_on_windows(fileno): """Win32 mangles \r\n to \n and that breaks streams. See bug lp:505078.""" |
