summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJonathan Lange <jml@mumak.net>2015-11-18 19:57:16 +0000
committerJonathan Lange <jml@mumak.net>2015-11-18 19:57:16 +0000
commit0fb16c69a3d36e1172a2891045879d92d9a77368 (patch)
tree6de0e3c95c5ecd8d1602cf5ec96bc0c37d0c7053
parent9e069841b47ecdb2a8233b0aae0cd8dc6b5e3e88 (diff)
parent7c9906eb8bc2c931e65d2001f871301e037e08a8 (diff)
downloadtesttools-0fb16c69a3d36e1172a2891045879d92d9a77368.tar.gz
Merge pull request #158 from jml/extract-test-record
Extract TestRecord object from StreamToDict
-rw-r--r--.travis.yml2
-rw-r--r--requirements.txt1
-rw-r--r--testtools/compat.py5
-rw-r--r--testtools/testresult/real.py376
4 files changed, 302 insertions, 82 deletions
diff --git a/.travis.yml b/.travis.yml
index 8e62b21..0ca34e9 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -37,7 +37,7 @@ matrix:
env: SPHINX="<1.3"
install:
- - pip install fixtures $JINJA_REQ sphinx$SPHINX $TWISTED_REQ
+ - pip install fixtures $JINJA_REQ sphinx$SPHINX pyrsistent $TWISTED_REQ
- python setup.py install
script:
diff --git a/requirements.txt b/requirements.txt
index a944636..92adc5f 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,5 +1,6 @@
pbr>=0.11
extras
+pyrsistent
# 'mimeparse' has not been uploaded by the maintainer with Python3 compat
# but someone kindly uploaded a fixed version as 'python-mimeparse'.
python-mimeparse
diff --git a/testtools/compat.py b/testtools/compat.py
index f4b9754..3f0bfab 100644
--- a/testtools/compat.py
+++ b/testtools/compat.py
@@ -1,4 +1,4 @@
-# Copyright (c) 2008-2011 testtools developers. See LICENSE for details.
+# Copyright (c) 2008-2015 testtools developers. See LICENSE for details.
"""Compatibility support for python 2 and 3."""
@@ -14,6 +14,7 @@ __all__ = [
'StringIO',
'reraise',
'unicode_output_stream',
+ 'text_or_bytes',
]
import codecs
@@ -66,6 +67,7 @@ if sys.version_info > (3, 0):
def classtypes():
return (type,)
str_is_unicode = True
+ text_or_bytes = (str, bytes)
else:
import __builtin__ as builtins
def _u(s):
@@ -83,6 +85,7 @@ else:
import types
return (type, types.ClassType)
str_is_unicode = sys.platform == "cli"
+ text_or_bytes = (unicode, str)
_u.__doc__ = __u_doc
diff --git a/testtools/testresult/real.py b/testtools/testresult/real.py
index 8773c65..e9fb540 100644
--- a/testtools/testresult/real.py
+++ b/testtools/testresult/real.py
@@ -32,7 +32,9 @@ from extras import safe_hasattr, try_import, try_imports
parse_mime_type = try_import('mimeparse.parse_mime_type')
Queue = try_imports(['Queue.Queue', 'queue.Queue'])
-from testtools.compat import str_is_unicode, _u, _b
+from pyrsistent import PClass, field, pmap_field, pset_field, pmap, pset, thaw
+
+from testtools.compat import str_is_unicode, text_or_bytes, _u, _b
from testtools.content import (
Content,
text_content,
@@ -267,6 +269,46 @@ class TestResult(unittest.TestResult):
"""
+"""Interim states:
+
+* None - no particular status is being reported, or status being reported is
+ not associated with a test (e.g. when reporting on stdout / stderr chatter).
+
+* inprogress - the test is currently running. Emitted by tests when they start
+ running and at any intermediary point they might choose to indicate their
+ continual operation.
+"""
+INTERIM_STATES = frozenset([None, 'inprogress'])
+
+"""Final states:
+
+* exists - the test exists. This is used when a test is not being executed.
+ Typically this is when querying what tests could be run in a test run (which
+ is useful for selecting tests to run).
+
+* xfail - the test failed but that was expected. This is purely informative -
+ the test is not considered to be a failure.
+
+* uxsuccess - the test passed but was expected to fail. The test will be
+ considered a failure.
+
+* success - the test has finished without error.
+
+* fail - the test failed (or errored). The test will be considered a failure.
+
+* skip - the test was selected to run but chose to be skipped. e.g. a test
+ dependency was missing. This is purely informative: the test is not
+ considered to be a failure.
+
+* unknown - we don't know what state the test is in
+"""
+FINAL_STATES = frozenset(
+ ['exists', 'xfail', 'uxsuccess', 'success', 'fail', 'skip', 'unknown'])
+
+
+STATES = INTERIM_STATES | FINAL_STATES
+
+
class StreamResult(object):
"""A test result for reporting the activity of a test run.
@@ -592,112 +634,277 @@ class StreamTagger(CopyStreamResult):
super(StreamTagger, self).status(*args, **kwargs)
-class StreamToDict(StreamResult):
+class _TestRecord(PClass):
+ """Representation of a test."""
+
+ """The test id."""
+ id = field(text_or_bytes, mandatory=True)
+
+ """Tags for the test."""
+ tags = pset_field(text_or_bytes, optional=False)
+
+ """File attachments."""
+ # XXX: Documentation says these are unicode, but tests pass in str.
+ details = pmap_field(text_or_bytes, Content, optional=False)
+
+ """One of the StreamResult status codes."""
+ status = field(
+ text_or_bytes, mandatory=True,
+ invariant=lambda x: (x in STATES, 'Invalid state'))
+
+ """Pair of timestamps (x, y).
+
+ x is the first timestamp we received for this test, y is the one that
+ triggered the notification. y can be None if the test hanged.
+ """
+ timestamps = field(tuple, mandatory=True)
+
+ @classmethod
+ def create(cls, test_id, timestamp):
+ return cls(
+ id=test_id,
+ tags=pset(),
+ details=pmap(),
+ status='unknown',
+ timestamps=(timestamp, None),
+ )
+
+ def to_dict(self):
+ """Convert record into a "test dict".
+
+ A "test dict" is a concept used in other parts of the code-base. It
+ has the following keys:
+
+ * id: the test id.
+ * tags: The tags for the test. A set of unicode strings.
+ * details: A dict of file attachments - ``testtools.content.Content``
+ objects.
+ * status: One of the StreamResult status codes (including inprogress)
+ or 'unknown' (used if only file events for a test were received...)
+ * timestamps: A pair of timestamps - the first one received with this
+ test id, and the one in the event that triggered the notification.
+ Hung tests have a None for the second end event. Timestamps are not
+ compared - their ordering is purely order received in the stream.
+ """
+ return {
+ 'id': self.id,
+ 'tags': thaw(self.tags),
+ 'details': thaw(self.details),
+ 'status': self.status,
+ 'timestamps': list(self.timestamps),
+ }
+
+ def got_timestamp(self, timestamp):
+ """Called when we receive a timestamp.
+
+ This will always update the second element of the 'timestamps' tuple.
+ It doesn't compare timestamps at all.
+ """
+ return self.set(timestamps=(self.timestamps[0], timestamp))
+
+ def got_file(self, file_name, file_bytes, mime_type=None):
+ """Called when we receive file information.
+
+ ``mime_type`` is only used when this is the first time we've seen data
+ from this file.
+ """
+ if file_name in self.details:
+ case = self
+ else:
+ content_type = _make_content_type(mime_type)
+ content_bytes = []
+ case = self.transform(
+ ['details', file_name],
+ Content(content_type, lambda: content_bytes))
+
+ case.details[file_name].iter_bytes().append(file_bytes)
+ return case
+
+ def to_test_case(self):
+ """Convert into a TestCase object.
+
+ :return: A PlaceHolder test object.
+ """
+ # Circular import.
+ global PlaceHolder
+ if PlaceHolder is None:
+ from testtools.testcase import PlaceHolder
+ outcome = _status_map[self.status]
+ return PlaceHolder(
+ self.id,
+ outcome=outcome,
+ details=thaw(self.details),
+ tags=thaw(self.tags),
+ timestamps=self.timestamps,
+ )
+
+
+def _make_content_type(mime_type=None):
+ """Return ContentType for a given mime type.
+
+ testtools was emitting a bad encoding, and this works around it.
+ Unfortunately, is also loses data - probably want to drop this in a few
+ releases.
+ """
+ # XXX: Not sure what release this was added, so "in a few releases" is
+ # unactionable.
+ if mime_type is None:
+ mime_type = 'application/octet-stream'
+
+ primary, sub, parameters = parse_mime_type(mime_type)
+ if 'charset' in parameters:
+ if ',' in parameters['charset']:
+ parameters['charset'] = parameters['charset'][
+ :parameters['charset'].find(',')]
+
+ return ContentType(primary, sub, parameters)
+
+
+_status_map = pmap({
+ 'inprogress': 'addFailure',
+ 'unknown': 'addFailure',
+ 'success': 'addSuccess',
+ 'skip': 'addSkip',
+ 'fail': 'addFailure',
+ 'xfail': 'addExpectedFailure',
+ 'uxsuccess': 'addUnexpectedSuccess',
+})
+
+
+class _StreamToTestRecord(StreamResult):
"""A specialised StreamResult that emits a callback as tests complete.
Top level file attachments are simply discarded. Hung tests are detected
by stopTestRun and notified there and then.
- The callback is passed a dict with the following keys:
-
- * id: the test id.
- * tags: The tags for the test. A set of unicode strings.
- * details: A dict of file attachments - ``testtools.content.Content``
- objects.
- * status: One of the StreamResult status codes (including inprogress) or
- 'unknown' (used if only file events for a test were received...)
- * timestamps: A pair of timestamps - the first one received with this
- test id, and the one in the event that triggered the notification.
- Hung tests have a None for the second end event. Timestamps are not
- compared - their ordering is purely order received in the stream.
+ The callback is passed a ``_TestRecord`` object.
Only the most recent tags observed in the stream are reported.
"""
def __init__(self, on_test):
- """Create a StreamToDict calling on_test on test completions.
+ """Create a _StreamToTestRecord calling on_test on test completions.
- :param on_test: A callback that accepts one parameter - a dict
- describing a test.
+ :param on_test: A callback that accepts one parameter:
+ a ``_TestRecord`` object describing a test.
"""
- super(StreamToDict, self).__init__()
+ super(_StreamToTestRecord, self).__init__()
self.on_test = on_test
if parse_mime_type is None:
raise ImportError("mimeparse module missing.")
def startTestRun(self):
- super(StreamToDict, self).startTestRun()
+ super(_StreamToTestRecord, self).startTestRun()
self._inprogress = {}
def status(self, test_id=None, test_status=None, test_tags=None,
runnable=True, file_name=None, file_bytes=None, eof=False,
mime_type=None, route_code=None, timestamp=None):
- super(StreamToDict, self).status(
+ super(_StreamToTestRecord, self).status(
test_id, test_status,
test_tags=test_tags, runnable=runnable, file_name=file_name,
file_bytes=file_bytes, eof=eof, mime_type=mime_type,
route_code=route_code, timestamp=timestamp)
+
key = self._ensure_key(test_id, route_code, timestamp)
- # update fields
if not key:
return
+
+ # update fields
+ self._inprogress[key] = self._update_case(
+ self._inprogress[key], test_status, test_tags, file_name,
+ file_bytes, mime_type, timestamp)
+
+ # notify completed tests.
+ if test_status not in INTERIM_STATES:
+ self.on_test(self._inprogress.pop(key))
+
+ def _update_case(self, case, test_status=None, test_tags=None,
+ file_name=None, file_bytes=None, mime_type=None,
+ timestamp=None):
if test_status is not None:
- self._inprogress[key]['status'] = test_status
- self._inprogress[key]['timestamps'][1] = timestamp
- case = self._inprogress[key]
+ case = case.set(status=test_status)
+
+ case = case.got_timestamp(timestamp)
+
if file_name is not None:
- if file_name not in case['details']:
- if mime_type is None:
- mime_type = 'application/octet-stream'
- primary, sub, parameters = parse_mime_type(mime_type)
- if 'charset' in parameters:
- if ',' in parameters['charset']:
- # testtools was emitting a bad encoding, workaround it,
- # Though this does lose data - probably want to drop
- # this in a few releases.
- parameters['charset'] = parameters['charset'][
- :parameters['charset'].find(',')]
- content_type = ContentType(primary, sub, parameters)
- content_bytes = []
- case['details'][file_name] = Content(
- content_type, lambda: content_bytes)
- case['details'][file_name].iter_bytes().append(file_bytes)
+ case = case.got_file(file_name, file_bytes, mime_type)
+
if test_tags is not None:
- self._inprogress[key]['tags'] = test_tags
- # notify completed tests.
- if test_status not in (None, 'inprogress'):
- self.on_test(self._inprogress.pop(key))
+ case = case.set('tags', test_tags)
+
+ return case
def stopTestRun(self):
- super(StreamToDict, self).stopTestRun()
+ super(_StreamToTestRecord, self).stopTestRun()
while self._inprogress:
case = self._inprogress.popitem()[1]
- case['timestamps'][1] = None
- self.on_test(case)
+ self.on_test(case.got_timestamp(None))
def _ensure_key(self, test_id, route_code, timestamp):
if test_id is None:
return
key = (test_id, route_code)
if key not in self._inprogress:
- self._inprogress[key] = {
- 'id': test_id,
- 'tags': set(),
- 'details': {},
- 'status': 'unknown',
- 'timestamps': [timestamp, None]}
+ self._inprogress[key] = _TestRecord.create(test_id, timestamp)
return key
-_status_map = {
- 'inprogress': 'addFailure',
- 'unknown': 'addFailure',
- 'success': 'addSuccess',
- 'skip': 'addSkip',
- 'fail': 'addFailure',
- 'xfail': 'addExpectedFailure',
- 'uxsuccess': 'addUnexpectedSuccess',
- }
+class StreamToDict(StreamResult):
+ """A specialised StreamResult that emits a callback as tests complete.
+
+ Top level file attachments are simply discarded. Hung tests are detected
+ by stopTestRun and notified there and then.
+
+ The callback is passed a dict with the following keys:
+
+ * id: the test id.
+ * tags: The tags for the test. A set of unicode strings.
+ * details: A dict of file attachments - ``testtools.content.Content``
+ objects.
+ * status: One of the StreamResult status codes (including inprogress) or
+ 'unknown' (used if only file events for a test were received...)
+ * timestamps: A pair of timestamps - the first one received with this
+ test id, and the one in the event that triggered the notification.
+ Hung tests have a None for the second end event. Timestamps are not
+ compared - their ordering is purely order received in the stream.
+
+ Only the most recent tags observed in the stream are reported.
+ """
+
+ # XXX: This could actually be replaced by a very simple function.
+ # Unfortunately, subclassing is a supported API.
+
+ # XXX: Alternative simplification is to extract a StreamAdapter base
+ # class, and have this inherit from that.
+
+ def __init__(self, on_test):
+ """Create a _StreamToTestRecord calling on_test on test completions.
+
+ :param on_test: A callback that accepts one parameter:
+ a dictionary describing a test.
+ """
+ super(StreamToDict, self).__init__()
+ self._hook = _StreamToTestRecord(self._handle_test)
+ # XXX: Not clear whether its part of the supported interface for
+ # self.on_test to be the passed-in on_test. If not, we could reduce
+ # the boilerplate by subclassing _StreamToTestRecord.
+ self.on_test = on_test
+
+ def _handle_test(self, test_record):
+ self.on_test(test_record.to_dict())
+
+ def startTestRun(self):
+ super(StreamToDict, self).startTestRun()
+ self._hook.startTestRun()
+
+ def status(self, *args, **kwargs):
+ super(StreamToDict, self).status(*args, **kwargs)
+ self._hook.status(*args, **kwargs)
+
+ def stopTestRun(self):
+ super(StreamToDict, self).stopTestRun()
+ self._hook.stopTestRun()
def test_dict_to_case(test_dict):
@@ -706,17 +913,16 @@ def test_dict_to_case(test_dict):
:param test_dict: A test dict as generated by StreamToDict.
:return: A PlaceHolder test object.
"""
- # Circular import.
- global PlaceHolder
- if PlaceHolder is None:
- from testtools.testcase import PlaceHolder
- outcome = _status_map[test_dict['status']]
- return PlaceHolder(
- test_dict['id'], outcome=outcome, details=test_dict['details'],
- tags=test_dict['tags'], timestamps=test_dict['timestamps'])
+ return _TestRecord(
+ id=test_dict['id'],
+ tags=test_dict['tags'],
+ details=test_dict['details'],
+ status=test_dict['status'],
+ timestamps=tuple(test_dict['timestamps']),
+ ).to_test_case()
-class StreamSummary(StreamToDict):
+class StreamSummary(StreamResult):
"""A specialised StreamResult that summarises a stream.
The summary uses the same representation as the original
@@ -725,7 +931,8 @@ class StreamSummary(StreamToDict):
"""
def __init__(self):
- super(StreamSummary, self).__init__(self._gather_test)
+ super(StreamSummary, self).__init__()
+ self._hook = _StreamToTestRecord(self._gather_test)
self._handle_status = {
'success': self._success,
'skip': self._skip,
@@ -745,6 +952,15 @@ class StreamSummary(StreamToDict):
self.skipped = []
self.expectedFailures = []
self.unexpectedSuccesses = []
+ self._hook.startTestRun()
+
+ def status(self, *args, **kwargs):
+ super(StreamSummary, self).status(*args, **kwargs)
+ self._hook.status(*args, **kwargs)
+
+ def stopTestRun(self):
+ super(StreamSummary, self).stopTestRun()
+ self._hook.stopTestRun()
def wasSuccessful(self):
"""Return False if any failure has occured.
@@ -754,12 +970,12 @@ class StreamSummary(StreamToDict):
"""
return (not self.failures and not self.errors)
- def _gather_test(self, test_dict):
- if test_dict['status'] == 'exists':
+ def _gather_test(self, test_record):
+ if test_record.status == 'exists':
return
self.testsRun += 1
- case = test_dict_to_case(test_dict)
- self._handle_status[test_dict['status']](case)
+ case = test_record.to_test_case()
+ self._handle_status[test_record.status](case)
def _incomplete(self, case):
self.errors.append((case, "Test did not complete"))
@@ -1472,8 +1688,8 @@ class StreamToExtendedDecorator(StreamResult):
# ExtendedToOriginalDecorator takes care of thunking details back to
# exceptions/reasons etc.
self.decorated = ExtendedToOriginalDecorator(decorated)
- # StreamToDict buffers and gives us individual tests.
- self.hook = StreamToDict(self._handle_tests)
+ # _StreamToTestRecord buffers and gives us individual tests.
+ self.hook = _StreamToTestRecord(self._handle_tests)
def status(self, test_id=None, test_status=None, *args, **kwargs):
if test_status == 'exists':
@@ -1489,8 +1705,8 @@ class StreamToExtendedDecorator(StreamResult):
self.hook.stopTestRun()
self.decorated.stopTestRun()
- def _handle_tests(self, test_dict):
- case = test_dict_to_case(test_dict)
+ def _handle_tests(self, test_record):
+ case = test_record.to_test_case()
case.run(self.decorated)