summaryrefslogtreecommitdiff
path: root/tools
diff options
context:
space:
mode:
authorJohn Griffith <john.griffith@solidfire.com>2013-05-31 13:42:17 -0600
committerJohn Griffith <john.griffith@solidfire.com>2013-05-31 13:45:13 -0600
commit2e58e73a0cdfae43ca206abcc1de0743324d8b68 (patch)
treec86b1e0676b4d230742bce7ce9c4ba85923d3001 /tools
parentbde6efb65c8a7b2e0bc08e6d9f187e5654cfba22 (diff)
downloadpython-cinderclient-2e58e73a0cdfae43ca206abcc1de0743324d8b68.tar.gz
Update run_tests and bring back colorizer.
This patch adds output of tests and their results to run_tests.sh. It also brings back colorizer to the output and updates the test-requirements. Should align with cinder changes that are in progress at: https://review.openstack.org/#/c/30291/ Change-Id: I3df6d861f4b4d4355464ceb2d507e69bcf682fbe
Diffstat (limited to 'tools')
-rwxr-xr-xtools/colorizer.py335
1 files changed, 335 insertions, 0 deletions
diff --git a/tools/colorizer.py b/tools/colorizer.py
new file mode 100755
index 0000000..a49abb1
--- /dev/null
+++ b/tools/colorizer.py
@@ -0,0 +1,335 @@
+#!/usr/bin/env python
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2013, Nebula, Inc.
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# Colorizer Code is borrowed from Twisted:
+# Copyright (c) 2001-2010 Twisted Matrix Laboratories.
+#
+# Permission is hereby granted, free of charge, to any person obtaining
+# a copy of this software and associated documentation files (the
+# "Software"), to deal in the Software without restriction, including
+# without limitation the rights to use, copy, modify, merge, publish,
+# distribute, sublicense, and/or sell copies of the Software, and to
+# permit persons to whom the Software is furnished to do so, subject to
+# the following conditions:
+#
+# The above copyright notice and this permission notice shall be
+# included in all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+"""Display a subunit stream through a colorized unittest test runner."""
+
+import heapq
+import subunit
+import sys
+import unittest
+
+import testtools
+
+
+class _AnsiColorizer(object):
+ """
+ A colorizer is an object that loosely wraps around a stream, allowing
+ callers to write text to the stream in a particular color.
+
+ Colorizer classes must implement C{supported()} and C{write(text, color)}.
+ """
+ _colors = dict(black=30, red=31, green=32, yellow=33,
+ blue=34, magenta=35, cyan=36, white=37)
+
+ def __init__(self, stream):
+ self.stream = stream
+
+ def supported(cls, stream=sys.stdout):
+ """
+ A class method that returns True if the current platform supports
+ coloring terminal output using this method. Returns False otherwise.
+ """
+ if not stream.isatty():
+ return False # auto color only on TTYs
+ try:
+ import curses
+ except ImportError:
+ return False
+ else:
+ try:
+ try:
+ return curses.tigetnum("colors") > 2
+ except curses.error:
+ curses.setupterm()
+ return curses.tigetnum("colors") > 2
+ except Exception:
+ # guess false in case of error
+ return False
+ supported = classmethod(supported)
+
+ def write(self, text, color):
+ """
+ Write the given text to the stream in the given color.
+
+ @param text: Text to be written to the stream.
+
+ @param color: A string label for a color. e.g. 'red', 'white'.
+ """
+ color = self._colors[color]
+ self.stream.write('\x1b[%s;1m%s\x1b[0m' % (color, text))
+
+
+class _Win32Colorizer(object):
+ """
+ See _AnsiColorizer docstring.
+ """
+ def __init__(self, stream):
+ import win32console
+ red, green, blue, bold = (win32console.FOREGROUND_RED,
+ win32console.FOREGROUND_GREEN,
+ win32console.FOREGROUND_BLUE,
+ win32console.FOREGROUND_INTENSITY)
+ self.stream = stream
+ self.screenBuffer = win32console.GetStdHandle(
+ win32console.STD_OUT_HANDLE)
+ self._colors = {
+ 'normal': red | green | blue,
+ 'red': red | bold,
+ 'green': green | bold,
+ 'blue': blue | bold,
+ 'yellow': red | green | bold,
+ 'magenta': red | blue | bold,
+ 'cyan': green | blue | bold,
+ 'white': red | green | blue | bold
+ }
+
+ def supported(cls, stream=sys.stdout):
+ try:
+ import win32console
+ screenBuffer = win32console.GetStdHandle(
+ win32console.STD_OUT_HANDLE)
+ except ImportError:
+ return False
+ import pywintypes
+ try:
+ screenBuffer.SetConsoleTextAttribute(
+ win32console.FOREGROUND_RED |
+ win32console.FOREGROUND_GREEN |
+ win32console.FOREGROUND_BLUE)
+ except pywintypes.error:
+ return False
+ else:
+ return True
+ supported = classmethod(supported)
+
+ def write(self, text, color):
+ color = self._colors[color]
+ self.screenBuffer.SetConsoleTextAttribute(color)
+ self.stream.write(text)
+ self.screenBuffer.SetConsoleTextAttribute(self._colors['normal'])
+
+
+class _NullColorizer(object):
+ """
+ See _AnsiColorizer docstring.
+ """
+ def __init__(self, stream):
+ self.stream = stream
+
+ def supported(cls, stream=sys.stdout):
+ return True
+ supported = classmethod(supported)
+
+ def write(self, text, color):
+ self.stream.write(text)
+
+
+def get_elapsed_time_color(elapsed_time):
+ if elapsed_time > 1.0:
+ return 'red'
+ elif elapsed_time > 0.25:
+ return 'yellow'
+ else:
+ return 'green'
+
+
+class NovaTestResult(testtools.TestResult):
+ def __init__(self, stream, descriptions, verbosity):
+ super(NovaTestResult, self).__init__()
+ self.stream = stream
+ self.showAll = verbosity > 1
+ self.num_slow_tests = 10
+ self.slow_tests = [] # this is a fixed-sized heap
+ self.colorizer = None
+ # NOTE(vish): reset stdout for the terminal check
+ stdout = sys.stdout
+ sys.stdout = sys.__stdout__
+ for colorizer in [_Win32Colorizer, _AnsiColorizer, _NullColorizer]:
+ if colorizer.supported():
+ self.colorizer = colorizer(self.stream)
+ break
+ sys.stdout = stdout
+ self.start_time = None
+ self.last_time = {}
+ self.results = {}
+ self.last_written = None
+
+ def _writeElapsedTime(self, elapsed):
+ color = get_elapsed_time_color(elapsed)
+ self.colorizer.write(" %.2f" % elapsed, color)
+
+ def _addResult(self, test, *args):
+ try:
+ name = test.id()
+ except AttributeError:
+ name = 'Unknown.unknown'
+ test_class, test_name = name.rsplit('.', 1)
+
+ elapsed = (self._now() - self.start_time).total_seconds()
+ item = (elapsed, test_class, test_name)
+ if len(self.slow_tests) >= self.num_slow_tests:
+ heapq.heappushpop(self.slow_tests, item)
+ else:
+ heapq.heappush(self.slow_tests, item)
+
+ self.results.setdefault(test_class, [])
+ self.results[test_class].append((test_name, elapsed) + args)
+ self.last_time[test_class] = self._now()
+ self.writeTests()
+
+ def _writeResult(self, test_name, elapsed, long_result, color,
+ short_result, success):
+ if self.showAll:
+ self.stream.write(' %s' % str(test_name).ljust(66))
+ self.colorizer.write(long_result, color)
+ if success:
+ self._writeElapsedTime(elapsed)
+ self.stream.writeln()
+ else:
+ self.colorizer.write(short_result, color)
+
+ def addSuccess(self, test):
+ super(NovaTestResult, self).addSuccess(test)
+ self._addResult(test, 'OK', 'green', '.', True)
+
+ def addFailure(self, test, err):
+ if test.id() == 'process-returncode':
+ return
+ super(NovaTestResult, self).addFailure(test, err)
+ self._addResult(test, 'FAIL', 'red', 'F', False)
+
+ def addError(self, test, err):
+ super(NovaTestResult, self).addFailure(test, err)
+ self._addResult(test, 'ERROR', 'red', 'E', False)
+
+ def addSkip(self, test, reason=None, details=None):
+ super(NovaTestResult, self).addSkip(test, reason, details)
+ self._addResult(test, 'SKIP', 'blue', 'S', True)
+
+ def startTest(self, test):
+ self.start_time = self._now()
+ super(NovaTestResult, self).startTest(test)
+
+ def writeTestCase(self, cls):
+ if not self.results.get(cls):
+ return
+ if cls != self.last_written:
+ self.colorizer.write(cls, 'white')
+ self.stream.writeln()
+ for result in self.results[cls]:
+ self._writeResult(*result)
+ del self.results[cls]
+ self.stream.flush()
+ self.last_written = cls
+
+ def writeTests(self):
+ time = self.last_time.get(self.last_written, self._now())
+ if not self.last_written or (self._now() - time).total_seconds() > 2.0:
+ diff = 3.0
+ while diff > 2.0:
+ classes = self.results.keys()
+ oldest = min(classes, key=lambda x: self.last_time[x])
+ diff = (self._now() - self.last_time[oldest]).total_seconds()
+ self.writeTestCase(oldest)
+ else:
+ self.writeTestCase(self.last_written)
+
+ def done(self):
+ self.stopTestRun()
+
+ def stopTestRun(self):
+ for cls in list(self.results.iterkeys()):
+ self.writeTestCase(cls)
+ self.stream.writeln()
+ self.writeSlowTests()
+
+ def writeSlowTests(self):
+ # Pare out 'fast' tests
+ slow_tests = [item for item in self.slow_tests
+ if get_elapsed_time_color(item[0]) != 'green']
+ if slow_tests:
+ slow_total_time = sum(item[0] for item in slow_tests)
+ slow = ("Slowest %i tests took %.2f secs:"
+ % (len(slow_tests), slow_total_time))
+ self.colorizer.write(slow, 'yellow')
+ self.stream.writeln()
+ last_cls = None
+ # sort by name
+ for elapsed, cls, name in sorted(slow_tests,
+ key=lambda x: x[1] + x[2]):
+ if cls != last_cls:
+ self.colorizer.write(cls, 'white')
+ self.stream.writeln()
+ last_cls = cls
+ self.stream.write(' %s' % str(name).ljust(68))
+ self._writeElapsedTime(elapsed)
+ self.stream.writeln()
+
+ def printErrors(self):
+ if self.showAll:
+ self.stream.writeln()
+ self.printErrorList('ERROR', self.errors)
+ self.printErrorList('FAIL', self.failures)
+
+ def printErrorList(self, flavor, errors):
+ for test, err in errors:
+ self.colorizer.write("=" * 70, 'red')
+ self.stream.writeln()
+ self.colorizer.write(flavor, 'red')
+ self.stream.writeln(": %s" % test.id())
+ self.colorizer.write("-" * 70, 'red')
+ self.stream.writeln()
+ self.stream.writeln("%s" % err)
+
+
+test = subunit.ProtocolTestCase(sys.stdin, passthrough=None)
+
+if sys.version_info[0:2] <= (2, 6):
+ runner = unittest.TextTestRunner(verbosity=2)
+else:
+ runner = unittest.TextTestRunner(verbosity=2, resultclass=NovaTestResult)
+
+if runner.run(test).wasSuccessful():
+ exit_code = 0
+else:
+ exit_code = 1
+sys.exit(exit_code)