summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJonathan Lange <jml@mumak.net>2015-11-02 18:38:11 +0000
committerJonathan Lange <jml@mumak.net>2015-11-02 21:11:30 +0000
commit58bcab673d0858b0573f199393c3888071d00d6f (patch)
treef5a09cdf345f77960c8ec63639862af0ab9cd733
parent5e91d1d2c6ad2bb52e302d82d98f289b5ec57aae (diff)
downloadtesttools-58bcab673d0858b0573f199393c3888071d00d6f.tar.gz
Fix lint in testresult.real
-rw-r--r--testtools/testresult/real.py129
1 files changed, 73 insertions, 56 deletions
diff --git a/testtools/testresult/real.py b/testtools/testresult/real.py
index 44e3468..5ecd24d 100644
--- a/testtools/testresult/real.py
+++ b/testtools/testresult/real.py
@@ -47,7 +47,6 @@ PlaceHolder = None
# From http://docs.python.org/library/datetime.html
_ZERO = datetime.timedelta(0)
-# A UTC class.
class UTC(datetime.tzinfo):
"""UTC"""
@@ -110,8 +109,8 @@ class TestResult(unittest.TestResult):
:param details: Alternative way to supply details about the outcome.
see the class docstring for more information.
"""
- self.errors.append((test,
- self._err_details_to_string(test, err, details)))
+ self.errors.append(
+ (test, self._err_details_to_string(test, err, details)))
if self.failfast:
self.stop()
@@ -122,8 +121,8 @@ class TestResult(unittest.TestResult):
:param details: Alternative way to supply details about the outcome.
see the class docstring for more information.
"""
- self.failures.append((test,
- self._err_details_to_string(test, err, details)))
+ self.failures.append(
+ (test, self._err_details_to_string(test, err, details)))
if self.failfast:
self.stop()
@@ -325,8 +324,8 @@ class StreamResult(object):
"""
def status(self, test_id=None, test_status=None, test_tags=None,
- runnable=True, file_name=None, file_bytes=None, eof=False,
- mime_type=None, route_code=None, timestamp=None):
+ runnable=True, file_name=None, file_bytes=None, eof=False,
+ mime_type=None, route_code=None, timestamp=None):
"""Inform the result about a test status.
:param test_id: The test whose status is being reported. None to
@@ -342,24 +341,24 @@ class StreamResult(object):
* None - no particular status is being reported, or status being
reported is not associated with a test (e.g. when reporting on
stdout / stderr chatter).
- * inprogress - the test is currently running. Emitted by tests when
- they start running and at any intermediary point they might
- choose to indicate their continual operation.
+ * inprogress - the test is currently running. Emitted by tests
+ when they start running and at any intermediary point they
+ might choose to indicate their continual operation.
Final states:
* exists - the test exists. This is used when a test is not being
- executed. Typically this is when querying what tests could be run
- in a test run (which is useful for selecting tests to run).
+ executed. Typically this is when querying what tests could be
+ run in a test run (which is useful for selecting tests to run).
* xfail - the test failed but that was expected. This is purely
informative - the test is not considered to be a failure.
* uxsuccess - the test passed but was expected to fail. The test
will be considered a failure.
* success - the test has finished without error.
- * fail - the test failed (or errored). The test will be considered
- a failure.
- * skip - the test was selected to run but chose to be skipped. E.g.
- a test dependency was missing. This is purely informative - the
- test is not considered to be a failure.
+ * fail - the test failed (or errored). The test will be
+ considered a failure.
+ * skip - the test was selected to run but chose to be skipped.
+ e.g. a test dependency was missing. This is purely informative-
+ the test is not considered to be a failure.
:param test_tags: Optional set of tags to apply to the test. Tags
have no intrinsic meaning - that is up to the test author.
@@ -428,8 +427,8 @@ class StreamFailFast(StreamResult):
self.on_error = on_error
def status(self, test_id=None, test_status=None, test_tags=None,
- runnable=True, file_name=None, file_bytes=None, eof=False,
- mime_type=None, route_code=None, timestamp=None):
+ runnable=True, file_name=None, file_bytes=None, eof=False,
+ mime_type=None, route_code=None, timestamp=None):
if test_status in ('uxsuccess', 'fail'):
self.on_error()
@@ -449,7 +448,8 @@ class StreamResultRouter(StreamResult):
>>> sink = doubles.StreamResult()
>>> router.add_rule(sink, 'route_code_prefix', route_prefix='0',
... consume_route=True)
- >>> router.status(test_id='foo', route_code='0/1', test_status='uxsuccess')
+ >>> router.status(
+ ... test_id='foo', route_code='0/1', test_status='uxsuccess')
StreamResultRouter has no buffering.
@@ -617,9 +617,10 @@ class StreamToDict(StreamResult):
self._inprogress = {}
def status(self, test_id=None, test_status=None, test_tags=None,
- runnable=True, file_name=None, file_bytes=None, eof=False,
- mime_type=None, route_code=None, timestamp=None):
- super(StreamToDict, self).status(test_id, test_status,
+ runnable=True, file_name=None, file_bytes=None, eof=False,
+ mime_type=None, route_code=None, timestamp=None):
+ super(StreamToDict, self).status(
+ test_id, test_status,
test_tags=test_tags, runnable=runnable, file_name=file_name,
file_bytes=file_bytes, eof=eof, mime_type=mime_type,
route_code=route_code, timestamp=timestamp)
@@ -646,7 +647,7 @@ class StreamToDict(StreamResult):
content_type = ContentType(primary, sub, parameters)
content_bytes = []
case['details'][file_name] = Content(
- content_type, lambda:content_bytes)
+ content_type, lambda: content_bytes)
case['details'][file_name].iter_bytes().append(file_bytes)
if test_tags is not None:
self._inprogress[key]['tags'] = test_tags
@@ -698,8 +699,8 @@ def test_dict_to_case(test_dict):
from testtools.testcase import PlaceHolder
outcome = _status_map[test_dict['status']]
return PlaceHolder(test_dict['id'], outcome=outcome,
- details=test_dict['details'], tags=test_dict['tags'],
- timestamps=test_dict['timestamps'])
+ details=test_dict['details'], tags=test_dict['tags'],
+ timestamps=test_dict['timestamps'])
class StreamSummary(StreamToDict):
@@ -812,12 +813,14 @@ class MultiTestResult(TestResult):
def _get_failfast(self):
return getattr(self._results[0], 'failfast', False)
+
def _set_failfast(self, value):
self._dispatch('__setattr__', 'failfast', value)
failfast = property(_get_failfast, _set_failfast)
def _get_shouldStop(self):
return any(self._dispatch('__getattr__', 'shouldStop'))
+
def _set_shouldStop(self, value):
# Called because we subclass TestResult. Probably should not do that.
pass
@@ -895,8 +898,9 @@ class TextTestResult(TestResult):
# to decide whether to round/floor/ceiling. This was added when we
# had pyp3 test failures that suggest a floor was happening.
shift = 10 ** precision
- return math.ceil((a_timedelta.days * 86400.0 + a_timedelta.seconds +
- a_timedelta.microseconds / 1000000.0) * shift) / shift
+ return math.ceil(
+ (a_timedelta.days * 86400.0 + a_timedelta.seconds +
+ a_timedelta.microseconds / 1000000.0) * shift) / shift
def _show_list(self, label, error_list):
for test, output in error_list:
@@ -922,9 +926,10 @@ class TextTestResult(TestResult):
self.stream.write(
"%sUNEXPECTED SUCCESS: %s\n%s" % (
self.sep1, test.id(), self.sep2))
- self.stream.write("\nRan %d test%s in %.3fs\n" %
- (self.testsRun, plural,
- self._delta_to_float(stop - self.__start, 3)))
+ self.stream.write(
+ "\nRan %d test%s in %.3fs\n" % (
+ self.testsRun, plural,
+ self._delta_to_float(stop - self.__start, 3)))
if self.wasSuccessful():
self.stream.write("OK\n")
else:
@@ -1004,28 +1009,28 @@ class ThreadsafeForwardingResult(TestResult):
self._test_start = None
def addError(self, test, err=None, details=None):
- self._add_result_with_semaphore(self.result.addError,
- test, err, details=details)
+ self._add_result_with_semaphore(
+ self.result.addError, test, err, details=details)
def addExpectedFailure(self, test, err=None, details=None):
- self._add_result_with_semaphore(self.result.addExpectedFailure,
- test, err, details=details)
+ self._add_result_with_semaphore(
+ self.result.addExpectedFailure, test, err, details=details)
def addFailure(self, test, err=None, details=None):
- self._add_result_with_semaphore(self.result.addFailure,
- test, err, details=details)
+ self._add_result_with_semaphore(
+ self.result.addFailure, test, err, details=details)
def addSkip(self, test, reason=None, details=None):
- self._add_result_with_semaphore(self.result.addSkip,
- test, reason, details=details)
+ self._add_result_with_semaphore(
+ self.result.addSkip, test, reason, details=details)
def addSuccess(self, test, details=None):
- self._add_result_with_semaphore(self.result.addSuccess,
- test, details=details)
+ self._add_result_with_semaphore(
+ self.result.addSuccess, test, details=details)
def addUnexpectedSuccess(self, test, details=None):
- self._add_result_with_semaphore(self.result.addUnexpectedSuccess,
- test, details=details)
+ self._add_result_with_semaphore(
+ self.result.addUnexpectedSuccess, test, details=details)
def progress(self, offset, whence):
pass
@@ -1044,6 +1049,7 @@ class ThreadsafeForwardingResult(TestResult):
return self.result.shouldStop
finally:
self.semaphore.release()
+
def _set_shouldStop(self, value):
# Another case where we should not subclass TestResult
pass
@@ -1212,7 +1218,8 @@ class ExtendedToOriginalDecorator(object):
if details is not None:
param_count += 1
if param_count != 1:
- raise ValueError("Must pass only one of err '%s' and details '%s"
+ raise ValueError(
+ "Must pass only one of err '%s' and details '%s"
% (err, details))
def _details_to_exc_info(self, details):
@@ -1235,6 +1242,7 @@ class ExtendedToOriginalDecorator(object):
def _get_failfast(self):
return getattr(self.decorated, 'failfast', self._failfast)
+
def _set_failfast(self, value):
if safe_hasattr(self.decorated, 'failfast'):
self.decorated.failfast = value
@@ -1311,6 +1319,7 @@ class ExtendedToStreamDecorator(CopyStreamResult, StreamSummary, TestControl):
def _get_failfast(self):
return len(self.targets) == 2
+
def _set_failfast(self, value):
if value:
if len(self.targets) == 2:
@@ -1323,7 +1332,8 @@ class ExtendedToStreamDecorator(CopyStreamResult, StreamSummary, TestControl):
def startTest(self, test):
if not self._started:
self.startTestRun()
- self.status(test_id=test.id(), test_status='inprogress', timestamp=self._now())
+ self.status(
+ test_id=test.id(), test_status='inprogress', timestamp=self._now())
self._tags = TagContext(self._tags)
def stopTest(self, test):
@@ -1349,18 +1359,23 @@ class ExtendedToStreamDecorator(CopyStreamResult, StreamSummary, TestControl):
file_bytes = None
for next_bytes in content.iter_bytes():
if file_bytes is not None:
- self.status(file_name=name, file_bytes=file_bytes,
- mime_type=mime_type, test_id=test_id, timestamp=now)
+ self.status(
+ file_name=name, file_bytes=file_bytes,
+ mime_type=mime_type, test_id=test_id,
+ timestamp=now)
file_bytes = next_bytes
if file_bytes is None:
file_bytes = _b("")
- self.status(file_name=name, file_bytes=file_bytes, eof=True,
+ self.status(
+ file_name=name, file_bytes=file_bytes, eof=True,
mime_type=mime_type, test_id=test_id, timestamp=now)
if reason is not None:
- self.status(file_name='reason', file_bytes=reason.encode('utf8'),
+ self.status(
+ file_name='reason', file_bytes=reason.encode('utf8'),
eof=True, mime_type="text/plain; charset=utf8",
test_id=test_id, timestamp=now)
- self.status(test_id=test_id, test_status=status,
+ self.status(
+ test_id=test_id, test_status=status,
test_tags=self.current_tags, timestamp=now)
def addExpectedFailure(self, test, err=None, details=None):
@@ -1383,7 +1398,8 @@ class ExtendedToStreamDecorator(CopyStreamResult, StreamSummary, TestControl):
if details is not None:
param_count += 1
if param_count != 1:
- raise ValueError("Must pass only one of err '%s' and details '%s"
+ raise ValueError(
+ "Must pass only one of err '%s' and details '%s"
% (err, details))
def startTestRun(self):
@@ -1512,9 +1528,10 @@ class StreamToQueue(StreamResult):
self.queue.put(dict(event='startTestRun', result=self))
def status(self, test_id=None, test_status=None, test_tags=None,
- runnable=True, file_name=None, file_bytes=None, eof=False,
- mime_type=None, route_code=None, timestamp=None):
- self.queue.put(dict(event='status', test_id=test_id,
+ runnable=True, file_name=None, file_bytes=None, eof=False,
+ mime_type=None, route_code=None, timestamp=None):
+ self.queue.put(dict(
+ event='status', test_id=test_id,
test_status=test_status, test_tags=test_tags, runnable=runnable,
file_name=file_name, file_bytes=file_bytes, eof=eof,
mime_type=mime_type, route_code=self.route_code(route_code),
@@ -1718,8 +1735,8 @@ class _StringException(Exception):
if not str_is_unicode:
def __init__(self, string):
if type(string) is not unicode:
- raise TypeError("_StringException expects unicode, got %r" %
- (string,))
+ raise TypeError(
+ "_StringException expects unicode, got %r" % (string,))
Exception.__init__(self, string)
def __str__(self):