summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--NEWS8
-rw-r--r--testrepository/commands/load.py35
-rw-r--r--testrepository/repository/memory.py5
-rw-r--r--testrepository/results.py2
-rw-r--r--testrepository/setuptools_command.py2
-rw-r--r--testrepository/testcommand.py136
-rw-r--r--testrepository/tests/commands/test_load.py14
-rw-r--r--testrepository/tests/commands/test_run.py1
-rw-r--r--testrepository/tests/test_testcommand.py27
-rw-r--r--testrepository/tests/ui/test_cli.py2
10 files changed, 135 insertions, 97 deletions
diff --git a/NEWS b/NEWS
index 419c19e..accde30 100644
--- a/NEWS
+++ b/NEWS
@@ -8,6 +8,9 @@ NEXT (In development)
CHANGES
-------
+* Fix Python 3.* support for entrypoints; the initial code was Python3
+ incompatible. (Robert Collins, Clark Boylan, #1187192)
+
* Switch to using multiprocessing to determine CPU counts.
(Chris Jones, #1092276)
@@ -16,6 +19,11 @@ CHANGES
load command to take interactive input without it reading from the raw
subunit stream on stdin. (Robert Collins)
+* The scheduler can now groups tests together permitting co-dependent tests to
+ always be scheduled onto the same backend. Note that this does not force
+ co-dependent tests to be executed, so partial test runs (e.g. --failing)
+ may still fail. (Matthew Treinish, Robert Collins)
+
0.0.15
++++++
diff --git a/testrepository/commands/load.py b/testrepository/commands/load.py
index 96d15f5..0ca4688 100644
--- a/testrepository/commands/load.py
+++ b/testrepository/commands/load.py
@@ -17,6 +17,7 @@
from functools import partial
from operator import methodcaller
import optparse
+import threading
from extras import try_import
v2_avail = try_import('subunit.ByteStreamToStreamResult')
@@ -29,6 +30,26 @@ from testrepository.commands import Command
from testrepository.repository import RepositoryNotFound
from testrepository.testcommand import TestCommand
+class InputToStreamResult(object):
+ """Generate Stream events from stdin.
+
+ Really a UI responsibility?
+ """
+
+ def __init__(self, stream):
+ self.source = stream
+ self.stop = False
+
+ def run(self, result):
+ while True:
+ if self.stop:
+ return
+ char = self.source.read(1)
+ if not char:
+ return
+ if char == b'a':
+ result.status(test_id='stdin', test_status='fail')
+
class load(Command):
"""Load a subunit stream into a repository.
@@ -39,7 +60,7 @@ class load(Command):
Unless the stream is a partial stream, any existing failures are discarded.
"""
- input_streams = ['subunit+']
+ input_streams = ['subunit+', 'interactive?']
args = [ExistingPathArgument('streams', min=0, max=None)]
options = [
@@ -116,11 +137,23 @@ class load(Command):
output_result, summary_result = self.ui.make_result(
inserter.get_id, testcommand, previous_run=previous_run)
result = testtools.CopyStreamResult([inserter, output_result])
+ runner_thread = None
result.startTestRun()
try:
+ # Convert user input into a stdin event stream
+ interactive_streams = list(self.ui.iter_streams('interactive'))
+ if interactive_streams:
+ case = InputToStreamResult(interactive_streams[0])
+ runner_thread = threading.Thread(
+ target=case.run, args=(result,))
+ runner_thread.daemon = True
+ runner_thread.start()
case.run(result)
finally:
result.stopTestRun()
+ if interactive_streams and runner_thread:
+ runner_thread.stop = True
+ runner_thread.join(10)
if not summary_result.wasSuccessful():
return 1
else:
diff --git a/testrepository/repository/memory.py b/testrepository/repository/memory.py
index 8aee71f..07eb667 100644
--- a/testrepository/repository/memory.py
+++ b/testrepository/repository/memory.py
@@ -14,6 +14,7 @@
"""In memory storage of test results."""
+from collections import OrderedDict
from io import BytesIO
from operator import methodcaller
@@ -55,7 +56,7 @@ class Repository(AbstractRepository):
def __init__(self):
# Test runs:
self._runs = []
- self._failing = {} # id -> test
+ self._failing = OrderedDict() # id -> test
self._times = {} # id -> duration
def count(self):
@@ -157,7 +158,7 @@ class _Inserter(AbstractTestRun):
self._repository._runs.append(self)
self._run_id = len(self._repository._runs) - 1
if not self._partial:
- self._repository._failing = {}
+ self._repository._failing = OrderedDict()
for test_dict in self._tests:
test_id = test_dict['id']
if test_dict['status'] == 'fail':
diff --git a/testrepository/results.py b/testrepository/results.py
index 8938314..ed01856 100644
--- a/testrepository/results.py
+++ b/testrepository/results.py
@@ -29,7 +29,7 @@ class SummarizingResult(StreamSummary):
self._last_time = None
def status(self, *args, **kwargs):
- if 'timestamp' in kwargs:
+ if kwargs.get('timestamp') is not None:
timestamp = kwargs['timestamp']
if self._last_time is None:
self._first_time = timestamp
diff --git a/testrepository/setuptools_command.py b/testrepository/setuptools_command.py
index 2718e3e..fbaf606 100644
--- a/testrepository/setuptools_command.py
+++ b/testrepository/setuptools_command.py
@@ -77,7 +77,7 @@ class Testr(cmd.Command):
raise distutils.errors.DistutilsError(
"testr failed (%d)" % testr_ret)
if self.slowest:
- print "Slowest Tests"
+ print ("Slowest Tests")
self._run_testr("slowest")
if self.coverage:
self._coverage_after()
diff --git a/testrepository/testcommand.py b/testrepository/testcommand.py
index 65a8f61..d93881d 100644
--- a/testrepository/testcommand.py
+++ b/testrepository/testcommand.py
@@ -16,6 +16,7 @@
from extras import try_imports
+from collections import defaultdict
ConfigParser = try_imports(['ConfigParser', 'configparser'])
import itertools
import operator
@@ -134,7 +135,7 @@ class TestListingFixture(Fixture):
def __init__(self, test_ids, cmd_template, listopt, idoption, ui,
repository, parallel=True, listpath=None, parser=None,
- test_filters=None, instance_source=None, group_regex=None):
+ test_filters=None, instance_source=None, group_callback=None):
"""Create a TestListingFixture.
:param test_ids: The test_ids to use. May be None indicating that
@@ -167,8 +168,10 @@ class TestListingFixture(Fixture):
:param instance_source: A source of test run instances. Must support
obtain_instance(max_concurrency) -> id and release_instance(id)
calls.
- :param group_regex: An optional regular expression string which is used
- to provide a grouping hint to the test partitioner
+ :param group_callback: If supplied, should be a function that accepts a
+ test id and returns a group id. A group id is an arbitrary value
+ used as a dictionary key in the scheduler. All test ids with the
+ same group id are scheduled onto the same backend test process.
"""
self.test_ids = test_ids
self.template = cmd_template
@@ -180,8 +183,8 @@ class TestListingFixture(Fixture):
self._listpath = listpath
self._parser = parser
self.test_filters = test_filters
+ self._group_callback = group_callback
self._instance_source = instance_source
- self.group_regex = group_regex
def setUp(self):
super(TestListingFixture, self).setUp()
@@ -330,7 +333,6 @@ class TestListingFixture(Fixture):
:return: A list of spawned processes.
"""
result = []
- group_tags = None
test_ids = self.test_ids
if self.concurrency == 1 and (test_ids is None or test_ids):
# Have to customise cmd here, as instances are allocated
@@ -347,11 +349,8 @@ class TestListingFixture(Fixture):
return [CallWhenProcFinishes(run_proc,
lambda:self._instance_source.release_instance(instance))]
else:
- return [run_proc]
- if self.group_regex:
- group_tags = self.filter_test_groups(test_ids, self.group_regex)
- test_id_groups = self.partition_tests(test_ids, self.concurrency,
- group_tags)
+ return [run_proc]
+ test_id_groups = self.partition_tests(test_ids, self.concurrency)
for test_ids in test_id_groups:
if not test_ids:
# No tests in this partition
@@ -363,28 +362,7 @@ class TestListingFixture(Fixture):
result.extend(fixture.run_tests())
return result
- def filter_test_groups(self, test_ids, group_regex):
- """Add a group tag based on the regex provided
-
- :return A dict with the group tags as keys and a list of
- test ids that are a member of the group tag as the value
- """
-
- group_dict = {}
- expr = re.compile(group_regex)
- for test_id in test_ids:
- match = expr.match(test_id)
- if match:
- group_id = match.group(0)
- else:
- group_id = None
- if group_dict.get(group_id):
- group_dict[group_id].append(test_id)
- else:
- group_dict[group_id] = [test_id]
- return group_dict
-
- def partition_tests(self, test_ids, concurrency, group_tags=None):
+ def partition_tests(self, test_ids, concurrency):
"""Parition test_ids by concurrency.
Test durations from the repository are used to get partitions which
@@ -395,63 +373,59 @@ class TestListingFixture(Fixture):
:return: A list where each element is a distinct subset of test_ids,
and the union of all the elements is equal to set(test_ids).
"""
-
partitions = [list() for i in range(concurrency)]
timed_partitions = [[0.0, partition] for partition in partitions]
time_data = self.repository.get_test_times(test_ids)
- timed = time_data['known']
- unknown = time_data['unknown']
- # Schedule test groups by the sum of execute time for each test that is
- # a member of the group
- if group_tags:
- group_timed = {}
- group_unknown = []
- for group_tag in group_tags.keys():
- time = 0.0
- for test_id in group_tags[group_tag]:
- # If a test_id is not timed remove the whole group from the
- # timed groups dict and
- if test_id in unknown:
- if group_tag in group_timed.keys():
- group_timed.pop(group_tag, None)
- group_unknown.append(group_tag)
- break
- time = time + timed[test_id]
- group_timed[group_tag] = (group_tags[group_tag], time)
-
- queue = sorted(group_timed.items(),
- key=operator.itemgetter(1),
- reverse=True)
-
- # Sort the tests by runtime
- for group_tag, test_tuple in queue:
- test_ids = test_tuple[0]
- duration = test_tuple[1]
- timed_partitions[0][0] = timed_partitions[0][0] + duration
- # Handle groups larger than a single entry
- timed_partitions[0][1].extend(test_ids)
- timed_partitions.sort(key=lambda item: (item[0], len(item[1])))
- for partition, group_id in zip(itertools.cycle(partitions),
- group_unknown):
- partition = partition + group_tags[group_id]
- return partitions
-
+ timed_tests = time_data['known']
+ unknown_tests = time_data['unknown']
+ # Group tests: generate group_id -> test_ids.
+ group_ids = defaultdict(list)
+ if self._group_callback is None:
+ group_callback = lambda _:None
+ else:
+ group_callback = self._group_callback
+ for test_id in test_ids:
+ group_id = group_callback(test_id) or test_id
+ group_ids[group_id].append(test_id)
+ # Time groups: generate three sets of groups:
+ # - fully timed dict(group_id -> time),
+ # - partially timed dict(group_id -> time) and
+ # - unknown (set of group_id)
+ # We may in future treat partially timed different for scheduling, but
+ # at least today we just schedule them after the fully timed groups.
+ timed = {}
+ partial = {}
+ unknown = []
+ for group_id, group_tests in group_ids.items():
+ untimed_ids = unknown_tests.intersection(group_tests)
+ group_time = sum([timed_tests[test_id]
+ for test_id in untimed_ids.symmetric_difference(group_tests)])
+ if not untimed_ids:
+ timed[group_id] = group_time
+ elif group_time:
+ partial[group_id] = group_time
+ else:
+ unknown.append(group_id)
# Scheduling is NP complete in general, so we avoid aiming for
# perfection. A quick approximation that is sufficient for our general
# needs:
- # sort the tests by time
- # allocate to partitions by putting each test in to the partition with
- # the current (lowest time, shortest length)
- else:
- queue = sorted(timed.items(), key=operator.itemgetter(1), reverse=True)
- for test_id, duration in queue:
+ # sort the groups by time
+ # allocate to partitions by putting each group in to the partition with
+ # the current (lowest time, shortest length[in tests])
+ def consume_queue(groups):
+ queue = sorted(
+ groups.items(), key=operator.itemgetter(1), reverse=True)
+ for group_id, duration in queue:
timed_partitions[0][0] = timed_partitions[0][0] + duration
- timed_partitions[0][1].append(test_id)
+ timed_partitions[0][1].extend(group_ids[group_id])
timed_partitions.sort(key=lambda item:(item[0], len(item[1])))
- # Assign tests with unknown times in round robin fashion to the partitions.
- for partition, test_id in zip(itertools.cycle(partitions), unknown):
- partition.append(test_id)
- return partitions
+ consume_queue(timed)
+ consume_queue(partial)
+ # Assign groups with entirely unknown times in round robin fashion to
+ # the partitions.
+ for partition, group_id in zip(itertools.cycle(partitions), unknown):
+ partition.extend(group_ids[group_id])
+ return partitions
def callout_concurrency(self):
"""Callout for user defined concurrency."""
diff --git a/testrepository/tests/commands/test_load.py b/testrepository/tests/commands/test_load.py
index ed564c8..11c42d7 100644
--- a/testrepository/tests/commands/test_load.py
+++ b/testrepository/tests/commands/test_load.py
@@ -224,6 +224,20 @@ class TestCommandLoad(ResourcedTestCase):
self.assertEqual(0, cmd.execute())
self.assertEqual([], ui.outputs)
+ def test_load_abort_over_interactive_stream(self):
+ ui = UI([('subunit', b''), ('interactive', b'a\n')])
+ cmd = load.load(ui)
+ ui.set_command(cmd)
+ cmd.repository_factory = memory.RepositoryFactory()
+ cmd.repository_factory.initialise(ui.here)
+ ret = cmd.execute()
+ self.assertEqual(
+ [('results', Wildcard),
+ ('summary', False, 1, None, None, None,
+ [('id', 0, None), ('failures', 1, None)])],
+ ui.outputs)
+ self.assertEqual(1, ret)
+
def test_partial_passed_to_repo(self):
ui = UI([('subunit', _b(''))], [('quiet', True), ('partial', True)])
cmd = load.load(ui)
diff --git a/testrepository/tests/commands/test_run.py b/testrepository/tests/commands/test_run.py
index 41a05ad..1efc5df 100644
--- a/testrepository/tests/commands/test_run.py
+++ b/testrepository/tests/commands/test_run.py
@@ -366,7 +366,6 @@ class TestCommand(ResourcedTestCase):
def test_regex_test_filter_with_explicit_ids(self):
ui, cmd = self.get_test_ui_and_cmd(
args=('g1', '--', 'bar', 'quux'),options=[('failing', True)])
- ui.proc_outputs = ['ab-cd\nefgh\n']
cmd.repository_factory = memory.RepositoryFactory()
self.setup_repo(cmd, ui)
self.set_config(
diff --git a/testrepository/tests/test_testcommand.py b/testrepository/tests/test_testcommand.py
index bbe4ac8..b5b3374 100644
--- a/testrepository/tests/test_testcommand.py
+++ b/testrepository/tests/test_testcommand.py
@@ -362,7 +362,7 @@ class TestTestCommand(ResourcedTestCase):
self.assertEqual(1, len(partitions[0]))
self.assertEqual(1, len(partitions[1]))
- def test_partition_tests_with_group_regex(self):
+ def test_partition_tests_with_grouping(self):
repo = memory.RepositoryFactory().initialise('memory:')
result = repo.get_inserter()
result.startTestRun()
@@ -381,17 +381,26 @@ class TestTestCommand(ResourcedTestCase):
'TestCase2.fast2', 'TestCase4.test',
'testdir.testfile.TestCase5.test'])
regex = 'TestCase[0-5]'
- group_tags = fixture.filter_test_groups(test_ids, regex)
- partitions = fixture.partition_tests(test_ids, 2, group_tags)
+ def group_id(test_id, regex=re.compile('TestCase[0-5]')):
+ match = regex.match(test_id)
+ if match:
+ return match.group(0)
+ # There isn't a public way to define a group callback [as yet].
+ fixture._group_callback = group_id
+ partitions = fixture.partition_tests(test_ids, 2)
+ # Timed groups are deterministic:
+ self.assertTrue('TestCase2.fast1' in partitions[0])
+ self.assertTrue('TestCase2.fast2' in partitions[0])
self.assertTrue('TestCase1.slow' in partitions[1])
self.assertTrue('TestCase1.fast' in partitions[1])
self.assertTrue('TestCase1.fast2' in partitions[1])
- self.assertTrue('TestCase3.test2' in partitions[1])
- self.assertTrue('TestCase3.test1' in partitions[1])
- self.assertTrue('TestCase4.test' in partitions[1])
- self.assertTrue('testdir.testfile.TestCase5.test' in partitions[0])
- self.assertTrue('TestCase2.fast1' in partitions[0])
- self.assertTrue('TestCase2.fast2' in partitions[0])
+ # Untimed groups just need to be kept together:
+ if 'TestCase3.test1' in partitions[0]:
+ self.assertTrue('TestCase3.test2' in partitions[0])
+ if 'TestCase4.test' not in partitions[0]:
+ self.assertTrue('TestCase4.test' in partitions[1])
+ if 'testdir.testfile.TestCase5.test' not in partitions[0]:
+ self.assertTrue('testdir.testfile.TestCase5.test' in partitions[1])
def test_run_tests_with_instances(self):
# when there are instances and no instance_execute, run_tests acts as
diff --git a/testrepository/tests/ui/test_cli.py b/testrepository/tests/ui/test_cli.py
index e75caa7..4935fa0 100644
--- a/testrepository/tests/ui/test_cli.py
+++ b/testrepository/tests/ui/test_cli.py
@@ -157,7 +157,7 @@ FAIL: testrepository.tests.ui.test_cli.Case.method
----------------------------------------------------------------------
...Traceback (most recent call last):...
File "...test_cli.py", line ..., in method
- self.fail(\'quux\')
+ self.fail(\'quux\')...
AssertionError: quux...
""", doctest.ELLIPSIS))