diff options
-rw-r--r-- | .travis.yml | 2 | ||||
-rw-r--r-- | requirements.txt | 1 | ||||
-rw-r--r-- | testtools/compat.py | 5 | ||||
-rw-r--r-- | testtools/testresult/real.py | 376 |
4 files changed, 302 insertions, 82 deletions
diff --git a/.travis.yml b/.travis.yml index 8e62b21..0ca34e9 100644 --- a/.travis.yml +++ b/.travis.yml @@ -37,7 +37,7 @@ matrix: env: SPHINX="<1.3" install: - - pip install fixtures $JINJA_REQ sphinx$SPHINX $TWISTED_REQ + - pip install fixtures $JINJA_REQ sphinx$SPHINX pyrsistent $TWISTED_REQ - python setup.py install script: diff --git a/requirements.txt b/requirements.txt index a944636..92adc5f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,6 @@ pbr>=0.11 extras +pyrsistent # 'mimeparse' has not been uploaded by the maintainer with Python3 compat # but someone kindly uploaded a fixed version as 'python-mimeparse'. python-mimeparse diff --git a/testtools/compat.py b/testtools/compat.py index f4b9754..3f0bfab 100644 --- a/testtools/compat.py +++ b/testtools/compat.py @@ -1,4 +1,4 @@ -# Copyright (c) 2008-2011 testtools developers. See LICENSE for details. +# Copyright (c) 2008-2015 testtools developers. See LICENSE for details. """Compatibility support for python 2 and 3.""" @@ -14,6 +14,7 @@ __all__ = [ 'StringIO', 'reraise', 'unicode_output_stream', + 'text_or_bytes', ] import codecs @@ -66,6 +67,7 @@ if sys.version_info > (3, 0): def classtypes(): return (type,) str_is_unicode = True + text_or_bytes = (str, bytes) else: import __builtin__ as builtins def _u(s): @@ -83,6 +85,7 @@ else: import types return (type, types.ClassType) str_is_unicode = sys.platform == "cli" + text_or_bytes = (unicode, str) _u.__doc__ = __u_doc diff --git a/testtools/testresult/real.py b/testtools/testresult/real.py index 8773c65..e9fb540 100644 --- a/testtools/testresult/real.py +++ b/testtools/testresult/real.py @@ -32,7 +32,9 @@ from extras import safe_hasattr, try_import, try_imports parse_mime_type = try_import('mimeparse.parse_mime_type') Queue = try_imports(['Queue.Queue', 'queue.Queue']) -from testtools.compat import str_is_unicode, _u, _b +from pyrsistent import PClass, field, pmap_field, pset_field, pmap, pset, thaw + +from testtools.compat import str_is_unicode, text_or_bytes, _u, _b from testtools.content import ( Content, text_content, @@ -267,6 +269,46 @@ class TestResult(unittest.TestResult): """ +"""Interim states: + +* None - no particular status is being reported, or status being reported is + not associated with a test (e.g. when reporting on stdout / stderr chatter). + +* inprogress - the test is currently running. Emitted by tests when they start + running and at any intermediary point they might choose to indicate their + continual operation. +""" +INTERIM_STATES = frozenset([None, 'inprogress']) + +"""Final states: + +* exists - the test exists. This is used when a test is not being executed. + Typically this is when querying what tests could be run in a test run (which + is useful for selecting tests to run). + +* xfail - the test failed but that was expected. This is purely informative - + the test is not considered to be a failure. + +* uxsuccess - the test passed but was expected to fail. The test will be + considered a failure. + +* success - the test has finished without error. + +* fail - the test failed (or errored). The test will be considered a failure. + +* skip - the test was selected to run but chose to be skipped. e.g. a test + dependency was missing. This is purely informative: the test is not + considered to be a failure. + +* unknown - we don't know what state the test is in +""" +FINAL_STATES = frozenset( + ['exists', 'xfail', 'uxsuccess', 'success', 'fail', 'skip', 'unknown']) + + +STATES = INTERIM_STATES | FINAL_STATES + + class StreamResult(object): """A test result for reporting the activity of a test run. @@ -592,112 +634,277 @@ class StreamTagger(CopyStreamResult): super(StreamTagger, self).status(*args, **kwargs) -class StreamToDict(StreamResult): +class _TestRecord(PClass): + """Representation of a test.""" + + """The test id.""" + id = field(text_or_bytes, mandatory=True) + + """Tags for the test.""" + tags = pset_field(text_or_bytes, optional=False) + + """File attachments.""" + # XXX: Documentation says these are unicode, but tests pass in str. + details = pmap_field(text_or_bytes, Content, optional=False) + + """One of the StreamResult status codes.""" + status = field( + text_or_bytes, mandatory=True, + invariant=lambda x: (x in STATES, 'Invalid state')) + + """Pair of timestamps (x, y). + + x is the first timestamp we received for this test, y is the one that + triggered the notification. y can be None if the test hanged. + """ + timestamps = field(tuple, mandatory=True) + + @classmethod + def create(cls, test_id, timestamp): + return cls( + id=test_id, + tags=pset(), + details=pmap(), + status='unknown', + timestamps=(timestamp, None), + ) + + def to_dict(self): + """Convert record into a "test dict". + + A "test dict" is a concept used in other parts of the code-base. It + has the following keys: + + * id: the test id. + * tags: The tags for the test. A set of unicode strings. + * details: A dict of file attachments - ``testtools.content.Content`` + objects. + * status: One of the StreamResult status codes (including inprogress) + or 'unknown' (used if only file events for a test were received...) + * timestamps: A pair of timestamps - the first one received with this + test id, and the one in the event that triggered the notification. + Hung tests have a None for the second end event. Timestamps are not + compared - their ordering is purely order received in the stream. + """ + return { + 'id': self.id, + 'tags': thaw(self.tags), + 'details': thaw(self.details), + 'status': self.status, + 'timestamps': list(self.timestamps), + } + + def got_timestamp(self, timestamp): + """Called when we receive a timestamp. + + This will always update the second element of the 'timestamps' tuple. + It doesn't compare timestamps at all. + """ + return self.set(timestamps=(self.timestamps[0], timestamp)) + + def got_file(self, file_name, file_bytes, mime_type=None): + """Called when we receive file information. + + ``mime_type`` is only used when this is the first time we've seen data + from this file. + """ + if file_name in self.details: + case = self + else: + content_type = _make_content_type(mime_type) + content_bytes = [] + case = self.transform( + ['details', file_name], + Content(content_type, lambda: content_bytes)) + + case.details[file_name].iter_bytes().append(file_bytes) + return case + + def to_test_case(self): + """Convert into a TestCase object. + + :return: A PlaceHolder test object. + """ + # Circular import. + global PlaceHolder + if PlaceHolder is None: + from testtools.testcase import PlaceHolder + outcome = _status_map[self.status] + return PlaceHolder( + self.id, + outcome=outcome, + details=thaw(self.details), + tags=thaw(self.tags), + timestamps=self.timestamps, + ) + + +def _make_content_type(mime_type=None): + """Return ContentType for a given mime type. + + testtools was emitting a bad encoding, and this works around it. + Unfortunately, is also loses data - probably want to drop this in a few + releases. + """ + # XXX: Not sure what release this was added, so "in a few releases" is + # unactionable. + if mime_type is None: + mime_type = 'application/octet-stream' + + primary, sub, parameters = parse_mime_type(mime_type) + if 'charset' in parameters: + if ',' in parameters['charset']: + parameters['charset'] = parameters['charset'][ + :parameters['charset'].find(',')] + + return ContentType(primary, sub, parameters) + + +_status_map = pmap({ + 'inprogress': 'addFailure', + 'unknown': 'addFailure', + 'success': 'addSuccess', + 'skip': 'addSkip', + 'fail': 'addFailure', + 'xfail': 'addExpectedFailure', + 'uxsuccess': 'addUnexpectedSuccess', +}) + + +class _StreamToTestRecord(StreamResult): """A specialised StreamResult that emits a callback as tests complete. Top level file attachments are simply discarded. Hung tests are detected by stopTestRun and notified there and then. - The callback is passed a dict with the following keys: - - * id: the test id. - * tags: The tags for the test. A set of unicode strings. - * details: A dict of file attachments - ``testtools.content.Content`` - objects. - * status: One of the StreamResult status codes (including inprogress) or - 'unknown' (used if only file events for a test were received...) - * timestamps: A pair of timestamps - the first one received with this - test id, and the one in the event that triggered the notification. - Hung tests have a None for the second end event. Timestamps are not - compared - their ordering is purely order received in the stream. + The callback is passed a ``_TestRecord`` object. Only the most recent tags observed in the stream are reported. """ def __init__(self, on_test): - """Create a StreamToDict calling on_test on test completions. + """Create a _StreamToTestRecord calling on_test on test completions. - :param on_test: A callback that accepts one parameter - a dict - describing a test. + :param on_test: A callback that accepts one parameter: + a ``_TestRecord`` object describing a test. """ - super(StreamToDict, self).__init__() + super(_StreamToTestRecord, self).__init__() self.on_test = on_test if parse_mime_type is None: raise ImportError("mimeparse module missing.") def startTestRun(self): - super(StreamToDict, self).startTestRun() + super(_StreamToTestRecord, self).startTestRun() self._inprogress = {} def status(self, test_id=None, test_status=None, test_tags=None, runnable=True, file_name=None, file_bytes=None, eof=False, mime_type=None, route_code=None, timestamp=None): - super(StreamToDict, self).status( + super(_StreamToTestRecord, self).status( test_id, test_status, test_tags=test_tags, runnable=runnable, file_name=file_name, file_bytes=file_bytes, eof=eof, mime_type=mime_type, route_code=route_code, timestamp=timestamp) + key = self._ensure_key(test_id, route_code, timestamp) - # update fields if not key: return + + # update fields + self._inprogress[key] = self._update_case( + self._inprogress[key], test_status, test_tags, file_name, + file_bytes, mime_type, timestamp) + + # notify completed tests. + if test_status not in INTERIM_STATES: + self.on_test(self._inprogress.pop(key)) + + def _update_case(self, case, test_status=None, test_tags=None, + file_name=None, file_bytes=None, mime_type=None, + timestamp=None): if test_status is not None: - self._inprogress[key]['status'] = test_status - self._inprogress[key]['timestamps'][1] = timestamp - case = self._inprogress[key] + case = case.set(status=test_status) + + case = case.got_timestamp(timestamp) + if file_name is not None: - if file_name not in case['details']: - if mime_type is None: - mime_type = 'application/octet-stream' - primary, sub, parameters = parse_mime_type(mime_type) - if 'charset' in parameters: - if ',' in parameters['charset']: - # testtools was emitting a bad encoding, workaround it, - # Though this does lose data - probably want to drop - # this in a few releases. - parameters['charset'] = parameters['charset'][ - :parameters['charset'].find(',')] - content_type = ContentType(primary, sub, parameters) - content_bytes = [] - case['details'][file_name] = Content( - content_type, lambda: content_bytes) - case['details'][file_name].iter_bytes().append(file_bytes) + case = case.got_file(file_name, file_bytes, mime_type) + if test_tags is not None: - self._inprogress[key]['tags'] = test_tags - # notify completed tests. - if test_status not in (None, 'inprogress'): - self.on_test(self._inprogress.pop(key)) + case = case.set('tags', test_tags) + + return case def stopTestRun(self): - super(StreamToDict, self).stopTestRun() + super(_StreamToTestRecord, self).stopTestRun() while self._inprogress: case = self._inprogress.popitem()[1] - case['timestamps'][1] = None - self.on_test(case) + self.on_test(case.got_timestamp(None)) def _ensure_key(self, test_id, route_code, timestamp): if test_id is None: return key = (test_id, route_code) if key not in self._inprogress: - self._inprogress[key] = { - 'id': test_id, - 'tags': set(), - 'details': {}, - 'status': 'unknown', - 'timestamps': [timestamp, None]} + self._inprogress[key] = _TestRecord.create(test_id, timestamp) return key -_status_map = { - 'inprogress': 'addFailure', - 'unknown': 'addFailure', - 'success': 'addSuccess', - 'skip': 'addSkip', - 'fail': 'addFailure', - 'xfail': 'addExpectedFailure', - 'uxsuccess': 'addUnexpectedSuccess', - } +class StreamToDict(StreamResult): + """A specialised StreamResult that emits a callback as tests complete. + + Top level file attachments are simply discarded. Hung tests are detected + by stopTestRun and notified there and then. + + The callback is passed a dict with the following keys: + + * id: the test id. + * tags: The tags for the test. A set of unicode strings. + * details: A dict of file attachments - ``testtools.content.Content`` + objects. + * status: One of the StreamResult status codes (including inprogress) or + 'unknown' (used if only file events for a test were received...) + * timestamps: A pair of timestamps - the first one received with this + test id, and the one in the event that triggered the notification. + Hung tests have a None for the second end event. Timestamps are not + compared - their ordering is purely order received in the stream. + + Only the most recent tags observed in the stream are reported. + """ + + # XXX: This could actually be replaced by a very simple function. + # Unfortunately, subclassing is a supported API. + + # XXX: Alternative simplification is to extract a StreamAdapter base + # class, and have this inherit from that. + + def __init__(self, on_test): + """Create a _StreamToTestRecord calling on_test on test completions. + + :param on_test: A callback that accepts one parameter: + a dictionary describing a test. + """ + super(StreamToDict, self).__init__() + self._hook = _StreamToTestRecord(self._handle_test) + # XXX: Not clear whether its part of the supported interface for + # self.on_test to be the passed-in on_test. If not, we could reduce + # the boilerplate by subclassing _StreamToTestRecord. + self.on_test = on_test + + def _handle_test(self, test_record): + self.on_test(test_record.to_dict()) + + def startTestRun(self): + super(StreamToDict, self).startTestRun() + self._hook.startTestRun() + + def status(self, *args, **kwargs): + super(StreamToDict, self).status(*args, **kwargs) + self._hook.status(*args, **kwargs) + + def stopTestRun(self): + super(StreamToDict, self).stopTestRun() + self._hook.stopTestRun() def test_dict_to_case(test_dict): @@ -706,17 +913,16 @@ def test_dict_to_case(test_dict): :param test_dict: A test dict as generated by StreamToDict. :return: A PlaceHolder test object. """ - # Circular import. - global PlaceHolder - if PlaceHolder is None: - from testtools.testcase import PlaceHolder - outcome = _status_map[test_dict['status']] - return PlaceHolder( - test_dict['id'], outcome=outcome, details=test_dict['details'], - tags=test_dict['tags'], timestamps=test_dict['timestamps']) + return _TestRecord( + id=test_dict['id'], + tags=test_dict['tags'], + details=test_dict['details'], + status=test_dict['status'], + timestamps=tuple(test_dict['timestamps']), + ).to_test_case() -class StreamSummary(StreamToDict): +class StreamSummary(StreamResult): """A specialised StreamResult that summarises a stream. The summary uses the same representation as the original @@ -725,7 +931,8 @@ class StreamSummary(StreamToDict): """ def __init__(self): - super(StreamSummary, self).__init__(self._gather_test) + super(StreamSummary, self).__init__() + self._hook = _StreamToTestRecord(self._gather_test) self._handle_status = { 'success': self._success, 'skip': self._skip, @@ -745,6 +952,15 @@ class StreamSummary(StreamToDict): self.skipped = [] self.expectedFailures = [] self.unexpectedSuccesses = [] + self._hook.startTestRun() + + def status(self, *args, **kwargs): + super(StreamSummary, self).status(*args, **kwargs) + self._hook.status(*args, **kwargs) + + def stopTestRun(self): + super(StreamSummary, self).stopTestRun() + self._hook.stopTestRun() def wasSuccessful(self): """Return False if any failure has occured. @@ -754,12 +970,12 @@ class StreamSummary(StreamToDict): """ return (not self.failures and not self.errors) - def _gather_test(self, test_dict): - if test_dict['status'] == 'exists': + def _gather_test(self, test_record): + if test_record.status == 'exists': return self.testsRun += 1 - case = test_dict_to_case(test_dict) - self._handle_status[test_dict['status']](case) + case = test_record.to_test_case() + self._handle_status[test_record.status](case) def _incomplete(self, case): self.errors.append((case, "Test did not complete")) @@ -1472,8 +1688,8 @@ class StreamToExtendedDecorator(StreamResult): # ExtendedToOriginalDecorator takes care of thunking details back to # exceptions/reasons etc. self.decorated = ExtendedToOriginalDecorator(decorated) - # StreamToDict buffers and gives us individual tests. - self.hook = StreamToDict(self._handle_tests) + # _StreamToTestRecord buffers and gives us individual tests. + self.hook = _StreamToTestRecord(self._handle_tests) def status(self, test_id=None, test_status=None, *args, **kwargs): if test_status == 'exists': @@ -1489,8 +1705,8 @@ class StreamToExtendedDecorator(StreamResult): self.hook.stopTestRun() self.decorated.stopTestRun() - def _handle_tests(self, test_dict): - case = test_dict_to_case(test_dict) + def _handle_tests(self, test_record): + case = test_record.to_test_case() case.run(self.decorated) |