diff options
-rw-r--r-- | NEWS | 8 | ||||
-rw-r--r-- | testrepository/commands/load.py | 35 | ||||
-rw-r--r-- | testrepository/repository/memory.py | 5 | ||||
-rw-r--r-- | testrepository/results.py | 2 | ||||
-rw-r--r-- | testrepository/setuptools_command.py | 2 | ||||
-rw-r--r-- | testrepository/testcommand.py | 136 | ||||
-rw-r--r-- | testrepository/tests/commands/test_load.py | 14 | ||||
-rw-r--r-- | testrepository/tests/commands/test_run.py | 1 | ||||
-rw-r--r-- | testrepository/tests/test_testcommand.py | 27 | ||||
-rw-r--r-- | testrepository/tests/ui/test_cli.py | 2 |
10 files changed, 135 insertions, 97 deletions
@@ -8,6 +8,9 @@ NEXT (In development) CHANGES ------- +* Fix Python 3.* support for entrypoints; the initial code was Python3 + incompatible. (Robert Collins, Clark Boylan, #1187192) + * Switch to using multiprocessing to determine CPU counts. (Chris Jones, #1092276) @@ -16,6 +19,11 @@ CHANGES load command to take interactive input without it reading from the raw subunit stream on stdin. (Robert Collins) +* The scheduler can now groups tests together permitting co-dependent tests to + always be scheduled onto the same backend. Note that this does not force + co-dependent tests to be executed, so partial test runs (e.g. --failing) + may still fail. (Matthew Treinish, Robert Collins) + 0.0.15 ++++++ diff --git a/testrepository/commands/load.py b/testrepository/commands/load.py index 96d15f5..0ca4688 100644 --- a/testrepository/commands/load.py +++ b/testrepository/commands/load.py @@ -17,6 +17,7 @@ from functools import partial from operator import methodcaller import optparse +import threading from extras import try_import v2_avail = try_import('subunit.ByteStreamToStreamResult') @@ -29,6 +30,26 @@ from testrepository.commands import Command from testrepository.repository import RepositoryNotFound from testrepository.testcommand import TestCommand +class InputToStreamResult(object): + """Generate Stream events from stdin. + + Really a UI responsibility? + """ + + def __init__(self, stream): + self.source = stream + self.stop = False + + def run(self, result): + while True: + if self.stop: + return + char = self.source.read(1) + if not char: + return + if char == b'a': + result.status(test_id='stdin', test_status='fail') + class load(Command): """Load a subunit stream into a repository. @@ -39,7 +60,7 @@ class load(Command): Unless the stream is a partial stream, any existing failures are discarded. """ - input_streams = ['subunit+'] + input_streams = ['subunit+', 'interactive?'] args = [ExistingPathArgument('streams', min=0, max=None)] options = [ @@ -116,11 +137,23 @@ class load(Command): output_result, summary_result = self.ui.make_result( inserter.get_id, testcommand, previous_run=previous_run) result = testtools.CopyStreamResult([inserter, output_result]) + runner_thread = None result.startTestRun() try: + # Convert user input into a stdin event stream + interactive_streams = list(self.ui.iter_streams('interactive')) + if interactive_streams: + case = InputToStreamResult(interactive_streams[0]) + runner_thread = threading.Thread( + target=case.run, args=(result,)) + runner_thread.daemon = True + runner_thread.start() case.run(result) finally: result.stopTestRun() + if interactive_streams and runner_thread: + runner_thread.stop = True + runner_thread.join(10) if not summary_result.wasSuccessful(): return 1 else: diff --git a/testrepository/repository/memory.py b/testrepository/repository/memory.py index 8aee71f..07eb667 100644 --- a/testrepository/repository/memory.py +++ b/testrepository/repository/memory.py @@ -14,6 +14,7 @@ """In memory storage of test results.""" +from collections import OrderedDict from io import BytesIO from operator import methodcaller @@ -55,7 +56,7 @@ class Repository(AbstractRepository): def __init__(self): # Test runs: self._runs = [] - self._failing = {} # id -> test + self._failing = OrderedDict() # id -> test self._times = {} # id -> duration def count(self): @@ -157,7 +158,7 @@ class _Inserter(AbstractTestRun): self._repository._runs.append(self) self._run_id = len(self._repository._runs) - 1 if not self._partial: - self._repository._failing = {} + self._repository._failing = OrderedDict() for test_dict in self._tests: test_id = test_dict['id'] if test_dict['status'] == 'fail': diff --git a/testrepository/results.py b/testrepository/results.py index 8938314..ed01856 100644 --- a/testrepository/results.py +++ b/testrepository/results.py @@ -29,7 +29,7 @@ class SummarizingResult(StreamSummary): self._last_time = None def status(self, *args, **kwargs): - if 'timestamp' in kwargs: + if kwargs.get('timestamp') is not None: timestamp = kwargs['timestamp'] if self._last_time is None: self._first_time = timestamp diff --git a/testrepository/setuptools_command.py b/testrepository/setuptools_command.py index 2718e3e..fbaf606 100644 --- a/testrepository/setuptools_command.py +++ b/testrepository/setuptools_command.py @@ -77,7 +77,7 @@ class Testr(cmd.Command): raise distutils.errors.DistutilsError( "testr failed (%d)" % testr_ret) if self.slowest: - print "Slowest Tests" + print ("Slowest Tests") self._run_testr("slowest") if self.coverage: self._coverage_after() diff --git a/testrepository/testcommand.py b/testrepository/testcommand.py index 65a8f61..d93881d 100644 --- a/testrepository/testcommand.py +++ b/testrepository/testcommand.py @@ -16,6 +16,7 @@ from extras import try_imports +from collections import defaultdict ConfigParser = try_imports(['ConfigParser', 'configparser']) import itertools import operator @@ -134,7 +135,7 @@ class TestListingFixture(Fixture): def __init__(self, test_ids, cmd_template, listopt, idoption, ui, repository, parallel=True, listpath=None, parser=None, - test_filters=None, instance_source=None, group_regex=None): + test_filters=None, instance_source=None, group_callback=None): """Create a TestListingFixture. :param test_ids: The test_ids to use. May be None indicating that @@ -167,8 +168,10 @@ class TestListingFixture(Fixture): :param instance_source: A source of test run instances. Must support obtain_instance(max_concurrency) -> id and release_instance(id) calls. - :param group_regex: An optional regular expression string which is used - to provide a grouping hint to the test partitioner + :param group_callback: If supplied, should be a function that accepts a + test id and returns a group id. A group id is an arbitrary value + used as a dictionary key in the scheduler. All test ids with the + same group id are scheduled onto the same backend test process. """ self.test_ids = test_ids self.template = cmd_template @@ -180,8 +183,8 @@ class TestListingFixture(Fixture): self._listpath = listpath self._parser = parser self.test_filters = test_filters + self._group_callback = group_callback self._instance_source = instance_source - self.group_regex = group_regex def setUp(self): super(TestListingFixture, self).setUp() @@ -330,7 +333,6 @@ class TestListingFixture(Fixture): :return: A list of spawned processes. """ result = [] - group_tags = None test_ids = self.test_ids if self.concurrency == 1 and (test_ids is None or test_ids): # Have to customise cmd here, as instances are allocated @@ -347,11 +349,8 @@ class TestListingFixture(Fixture): return [CallWhenProcFinishes(run_proc, lambda:self._instance_source.release_instance(instance))] else: - return [run_proc] - if self.group_regex: - group_tags = self.filter_test_groups(test_ids, self.group_regex) - test_id_groups = self.partition_tests(test_ids, self.concurrency, - group_tags) + return [run_proc] + test_id_groups = self.partition_tests(test_ids, self.concurrency) for test_ids in test_id_groups: if not test_ids: # No tests in this partition @@ -363,28 +362,7 @@ class TestListingFixture(Fixture): result.extend(fixture.run_tests()) return result - def filter_test_groups(self, test_ids, group_regex): - """Add a group tag based on the regex provided - - :return A dict with the group tags as keys and a list of - test ids that are a member of the group tag as the value - """ - - group_dict = {} - expr = re.compile(group_regex) - for test_id in test_ids: - match = expr.match(test_id) - if match: - group_id = match.group(0) - else: - group_id = None - if group_dict.get(group_id): - group_dict[group_id].append(test_id) - else: - group_dict[group_id] = [test_id] - return group_dict - - def partition_tests(self, test_ids, concurrency, group_tags=None): + def partition_tests(self, test_ids, concurrency): """Parition test_ids by concurrency. Test durations from the repository are used to get partitions which @@ -395,63 +373,59 @@ class TestListingFixture(Fixture): :return: A list where each element is a distinct subset of test_ids, and the union of all the elements is equal to set(test_ids). """ - partitions = [list() for i in range(concurrency)] timed_partitions = [[0.0, partition] for partition in partitions] time_data = self.repository.get_test_times(test_ids) - timed = time_data['known'] - unknown = time_data['unknown'] - # Schedule test groups by the sum of execute time for each test that is - # a member of the group - if group_tags: - group_timed = {} - group_unknown = [] - for group_tag in group_tags.keys(): - time = 0.0 - for test_id in group_tags[group_tag]: - # If a test_id is not timed remove the whole group from the - # timed groups dict and - if test_id in unknown: - if group_tag in group_timed.keys(): - group_timed.pop(group_tag, None) - group_unknown.append(group_tag) - break - time = time + timed[test_id] - group_timed[group_tag] = (group_tags[group_tag], time) - - queue = sorted(group_timed.items(), - key=operator.itemgetter(1), - reverse=True) - - # Sort the tests by runtime - for group_tag, test_tuple in queue: - test_ids = test_tuple[0] - duration = test_tuple[1] - timed_partitions[0][0] = timed_partitions[0][0] + duration - # Handle groups larger than a single entry - timed_partitions[0][1].extend(test_ids) - timed_partitions.sort(key=lambda item: (item[0], len(item[1]))) - for partition, group_id in zip(itertools.cycle(partitions), - group_unknown): - partition = partition + group_tags[group_id] - return partitions - + timed_tests = time_data['known'] + unknown_tests = time_data['unknown'] + # Group tests: generate group_id -> test_ids. + group_ids = defaultdict(list) + if self._group_callback is None: + group_callback = lambda _:None + else: + group_callback = self._group_callback + for test_id in test_ids: + group_id = group_callback(test_id) or test_id + group_ids[group_id].append(test_id) + # Time groups: generate three sets of groups: + # - fully timed dict(group_id -> time), + # - partially timed dict(group_id -> time) and + # - unknown (set of group_id) + # We may in future treat partially timed different for scheduling, but + # at least today we just schedule them after the fully timed groups. + timed = {} + partial = {} + unknown = [] + for group_id, group_tests in group_ids.items(): + untimed_ids = unknown_tests.intersection(group_tests) + group_time = sum([timed_tests[test_id] + for test_id in untimed_ids.symmetric_difference(group_tests)]) + if not untimed_ids: + timed[group_id] = group_time + elif group_time: + partial[group_id] = group_time + else: + unknown.append(group_id) # Scheduling is NP complete in general, so we avoid aiming for # perfection. A quick approximation that is sufficient for our general # needs: - # sort the tests by time - # allocate to partitions by putting each test in to the partition with - # the current (lowest time, shortest length) - else: - queue = sorted(timed.items(), key=operator.itemgetter(1), reverse=True) - for test_id, duration in queue: + # sort the groups by time + # allocate to partitions by putting each group in to the partition with + # the current (lowest time, shortest length[in tests]) + def consume_queue(groups): + queue = sorted( + groups.items(), key=operator.itemgetter(1), reverse=True) + for group_id, duration in queue: timed_partitions[0][0] = timed_partitions[0][0] + duration - timed_partitions[0][1].append(test_id) + timed_partitions[0][1].extend(group_ids[group_id]) timed_partitions.sort(key=lambda item:(item[0], len(item[1]))) - # Assign tests with unknown times in round robin fashion to the partitions. - for partition, test_id in zip(itertools.cycle(partitions), unknown): - partition.append(test_id) - return partitions + consume_queue(timed) + consume_queue(partial) + # Assign groups with entirely unknown times in round robin fashion to + # the partitions. + for partition, group_id in zip(itertools.cycle(partitions), unknown): + partition.extend(group_ids[group_id]) + return partitions def callout_concurrency(self): """Callout for user defined concurrency.""" diff --git a/testrepository/tests/commands/test_load.py b/testrepository/tests/commands/test_load.py index ed564c8..11c42d7 100644 --- a/testrepository/tests/commands/test_load.py +++ b/testrepository/tests/commands/test_load.py @@ -224,6 +224,20 @@ class TestCommandLoad(ResourcedTestCase): self.assertEqual(0, cmd.execute()) self.assertEqual([], ui.outputs) + def test_load_abort_over_interactive_stream(self): + ui = UI([('subunit', b''), ('interactive', b'a\n')]) + cmd = load.load(ui) + ui.set_command(cmd) + cmd.repository_factory = memory.RepositoryFactory() + cmd.repository_factory.initialise(ui.here) + ret = cmd.execute() + self.assertEqual( + [('results', Wildcard), + ('summary', False, 1, None, None, None, + [('id', 0, None), ('failures', 1, None)])], + ui.outputs) + self.assertEqual(1, ret) + def test_partial_passed_to_repo(self): ui = UI([('subunit', _b(''))], [('quiet', True), ('partial', True)]) cmd = load.load(ui) diff --git a/testrepository/tests/commands/test_run.py b/testrepository/tests/commands/test_run.py index 41a05ad..1efc5df 100644 --- a/testrepository/tests/commands/test_run.py +++ b/testrepository/tests/commands/test_run.py @@ -366,7 +366,6 @@ class TestCommand(ResourcedTestCase): def test_regex_test_filter_with_explicit_ids(self): ui, cmd = self.get_test_ui_and_cmd( args=('g1', '--', 'bar', 'quux'),options=[('failing', True)]) - ui.proc_outputs = ['ab-cd\nefgh\n'] cmd.repository_factory = memory.RepositoryFactory() self.setup_repo(cmd, ui) self.set_config( diff --git a/testrepository/tests/test_testcommand.py b/testrepository/tests/test_testcommand.py index bbe4ac8..b5b3374 100644 --- a/testrepository/tests/test_testcommand.py +++ b/testrepository/tests/test_testcommand.py @@ -362,7 +362,7 @@ class TestTestCommand(ResourcedTestCase): self.assertEqual(1, len(partitions[0])) self.assertEqual(1, len(partitions[1])) - def test_partition_tests_with_group_regex(self): + def test_partition_tests_with_grouping(self): repo = memory.RepositoryFactory().initialise('memory:') result = repo.get_inserter() result.startTestRun() @@ -381,17 +381,26 @@ class TestTestCommand(ResourcedTestCase): 'TestCase2.fast2', 'TestCase4.test', 'testdir.testfile.TestCase5.test']) regex = 'TestCase[0-5]' - group_tags = fixture.filter_test_groups(test_ids, regex) - partitions = fixture.partition_tests(test_ids, 2, group_tags) + def group_id(test_id, regex=re.compile('TestCase[0-5]')): + match = regex.match(test_id) + if match: + return match.group(0) + # There isn't a public way to define a group callback [as yet]. + fixture._group_callback = group_id + partitions = fixture.partition_tests(test_ids, 2) + # Timed groups are deterministic: + self.assertTrue('TestCase2.fast1' in partitions[0]) + self.assertTrue('TestCase2.fast2' in partitions[0]) self.assertTrue('TestCase1.slow' in partitions[1]) self.assertTrue('TestCase1.fast' in partitions[1]) self.assertTrue('TestCase1.fast2' in partitions[1]) - self.assertTrue('TestCase3.test2' in partitions[1]) - self.assertTrue('TestCase3.test1' in partitions[1]) - self.assertTrue('TestCase4.test' in partitions[1]) - self.assertTrue('testdir.testfile.TestCase5.test' in partitions[0]) - self.assertTrue('TestCase2.fast1' in partitions[0]) - self.assertTrue('TestCase2.fast2' in partitions[0]) + # Untimed groups just need to be kept together: + if 'TestCase3.test1' in partitions[0]: + self.assertTrue('TestCase3.test2' in partitions[0]) + if 'TestCase4.test' not in partitions[0]: + self.assertTrue('TestCase4.test' in partitions[1]) + if 'testdir.testfile.TestCase5.test' not in partitions[0]: + self.assertTrue('testdir.testfile.TestCase5.test' in partitions[1]) def test_run_tests_with_instances(self): # when there are instances and no instance_execute, run_tests acts as diff --git a/testrepository/tests/ui/test_cli.py b/testrepository/tests/ui/test_cli.py index e75caa7..4935fa0 100644 --- a/testrepository/tests/ui/test_cli.py +++ b/testrepository/tests/ui/test_cli.py @@ -157,7 +157,7 @@ FAIL: testrepository.tests.ui.test_cli.Case.method ---------------------------------------------------------------------- ...Traceback (most recent call last):... File "...test_cli.py", line ..., in method - self.fail(\'quux\') + self.fail(\'quux\')... AssertionError: quux... """, doctest.ELLIPSIS)) |