diff options
Diffstat (limited to 'testrepository/testcommand.py')
-rw-r--r-- | testrepository/testcommand.py | 136 |
1 files changed, 55 insertions, 81 deletions
diff --git a/testrepository/testcommand.py b/testrepository/testcommand.py index 65a8f61..d93881d 100644 --- a/testrepository/testcommand.py +++ b/testrepository/testcommand.py @@ -16,6 +16,7 @@ from extras import try_imports +from collections import defaultdict ConfigParser = try_imports(['ConfigParser', 'configparser']) import itertools import operator @@ -134,7 +135,7 @@ class TestListingFixture(Fixture): def __init__(self, test_ids, cmd_template, listopt, idoption, ui, repository, parallel=True, listpath=None, parser=None, - test_filters=None, instance_source=None, group_regex=None): + test_filters=None, instance_source=None, group_callback=None): """Create a TestListingFixture. :param test_ids: The test_ids to use. May be None indicating that @@ -167,8 +168,10 @@ class TestListingFixture(Fixture): :param instance_source: A source of test run instances. Must support obtain_instance(max_concurrency) -> id and release_instance(id) calls. - :param group_regex: An optional regular expression string which is used - to provide a grouping hint to the test partitioner + :param group_callback: If supplied, should be a function that accepts a + test id and returns a group id. A group id is an arbitrary value + used as a dictionary key in the scheduler. All test ids with the + same group id are scheduled onto the same backend test process. """ self.test_ids = test_ids self.template = cmd_template @@ -180,8 +183,8 @@ class TestListingFixture(Fixture): self._listpath = listpath self._parser = parser self.test_filters = test_filters + self._group_callback = group_callback self._instance_source = instance_source - self.group_regex = group_regex def setUp(self): super(TestListingFixture, self).setUp() @@ -330,7 +333,6 @@ class TestListingFixture(Fixture): :return: A list of spawned processes. """ result = [] - group_tags = None test_ids = self.test_ids if self.concurrency == 1 and (test_ids is None or test_ids): # Have to customise cmd here, as instances are allocated @@ -347,11 +349,8 @@ class TestListingFixture(Fixture): return [CallWhenProcFinishes(run_proc, lambda:self._instance_source.release_instance(instance))] else: - return [run_proc] - if self.group_regex: - group_tags = self.filter_test_groups(test_ids, self.group_regex) - test_id_groups = self.partition_tests(test_ids, self.concurrency, - group_tags) + return [run_proc] + test_id_groups = self.partition_tests(test_ids, self.concurrency) for test_ids in test_id_groups: if not test_ids: # No tests in this partition @@ -363,28 +362,7 @@ class TestListingFixture(Fixture): result.extend(fixture.run_tests()) return result - def filter_test_groups(self, test_ids, group_regex): - """Add a group tag based on the regex provided - - :return A dict with the group tags as keys and a list of - test ids that are a member of the group tag as the value - """ - - group_dict = {} - expr = re.compile(group_regex) - for test_id in test_ids: - match = expr.match(test_id) - if match: - group_id = match.group(0) - else: - group_id = None - if group_dict.get(group_id): - group_dict[group_id].append(test_id) - else: - group_dict[group_id] = [test_id] - return group_dict - - def partition_tests(self, test_ids, concurrency, group_tags=None): + def partition_tests(self, test_ids, concurrency): """Parition test_ids by concurrency. Test durations from the repository are used to get partitions which @@ -395,63 +373,59 @@ class TestListingFixture(Fixture): :return: A list where each element is a distinct subset of test_ids, and the union of all the elements is equal to set(test_ids). """ - partitions = [list() for i in range(concurrency)] timed_partitions = [[0.0, partition] for partition in partitions] time_data = self.repository.get_test_times(test_ids) - timed = time_data['known'] - unknown = time_data['unknown'] - # Schedule test groups by the sum of execute time for each test that is - # a member of the group - if group_tags: - group_timed = {} - group_unknown = [] - for group_tag in group_tags.keys(): - time = 0.0 - for test_id in group_tags[group_tag]: - # If a test_id is not timed remove the whole group from the - # timed groups dict and - if test_id in unknown: - if group_tag in group_timed.keys(): - group_timed.pop(group_tag, None) - group_unknown.append(group_tag) - break - time = time + timed[test_id] - group_timed[group_tag] = (group_tags[group_tag], time) - - queue = sorted(group_timed.items(), - key=operator.itemgetter(1), - reverse=True) - - # Sort the tests by runtime - for group_tag, test_tuple in queue: - test_ids = test_tuple[0] - duration = test_tuple[1] - timed_partitions[0][0] = timed_partitions[0][0] + duration - # Handle groups larger than a single entry - timed_partitions[0][1].extend(test_ids) - timed_partitions.sort(key=lambda item: (item[0], len(item[1]))) - for partition, group_id in zip(itertools.cycle(partitions), - group_unknown): - partition = partition + group_tags[group_id] - return partitions - + timed_tests = time_data['known'] + unknown_tests = time_data['unknown'] + # Group tests: generate group_id -> test_ids. + group_ids = defaultdict(list) + if self._group_callback is None: + group_callback = lambda _:None + else: + group_callback = self._group_callback + for test_id in test_ids: + group_id = group_callback(test_id) or test_id + group_ids[group_id].append(test_id) + # Time groups: generate three sets of groups: + # - fully timed dict(group_id -> time), + # - partially timed dict(group_id -> time) and + # - unknown (set of group_id) + # We may in future treat partially timed different for scheduling, but + # at least today we just schedule them after the fully timed groups. + timed = {} + partial = {} + unknown = [] + for group_id, group_tests in group_ids.items(): + untimed_ids = unknown_tests.intersection(group_tests) + group_time = sum([timed_tests[test_id] + for test_id in untimed_ids.symmetric_difference(group_tests)]) + if not untimed_ids: + timed[group_id] = group_time + elif group_time: + partial[group_id] = group_time + else: + unknown.append(group_id) # Scheduling is NP complete in general, so we avoid aiming for # perfection. A quick approximation that is sufficient for our general # needs: - # sort the tests by time - # allocate to partitions by putting each test in to the partition with - # the current (lowest time, shortest length) - else: - queue = sorted(timed.items(), key=operator.itemgetter(1), reverse=True) - for test_id, duration in queue: + # sort the groups by time + # allocate to partitions by putting each group in to the partition with + # the current (lowest time, shortest length[in tests]) + def consume_queue(groups): + queue = sorted( + groups.items(), key=operator.itemgetter(1), reverse=True) + for group_id, duration in queue: timed_partitions[0][0] = timed_partitions[0][0] + duration - timed_partitions[0][1].append(test_id) + timed_partitions[0][1].extend(group_ids[group_id]) timed_partitions.sort(key=lambda item:(item[0], len(item[1]))) - # Assign tests with unknown times in round robin fashion to the partitions. - for partition, test_id in zip(itertools.cycle(partitions), unknown): - partition.append(test_id) - return partitions + consume_queue(timed) + consume_queue(partial) + # Assign groups with entirely unknown times in round robin fashion to + # the partitions. + for partition, group_id in zip(itertools.cycle(partitions), unknown): + partition.extend(group_ids[group_id]) + return partitions def callout_concurrency(self): """Callout for user defined concurrency.""" |