summaryrefslogtreecommitdiff
path: root/testrepository/testcommand.py
diff options
context:
space:
mode:
Diffstat (limited to 'testrepository/testcommand.py')
-rw-r--r--testrepository/testcommand.py136
1 files changed, 55 insertions, 81 deletions
diff --git a/testrepository/testcommand.py b/testrepository/testcommand.py
index 65a8f61..d93881d 100644
--- a/testrepository/testcommand.py
+++ b/testrepository/testcommand.py
@@ -16,6 +16,7 @@
from extras import try_imports
+from collections import defaultdict
ConfigParser = try_imports(['ConfigParser', 'configparser'])
import itertools
import operator
@@ -134,7 +135,7 @@ class TestListingFixture(Fixture):
def __init__(self, test_ids, cmd_template, listopt, idoption, ui,
repository, parallel=True, listpath=None, parser=None,
- test_filters=None, instance_source=None, group_regex=None):
+ test_filters=None, instance_source=None, group_callback=None):
"""Create a TestListingFixture.
:param test_ids: The test_ids to use. May be None indicating that
@@ -167,8 +168,10 @@ class TestListingFixture(Fixture):
:param instance_source: A source of test run instances. Must support
obtain_instance(max_concurrency) -> id and release_instance(id)
calls.
- :param group_regex: An optional regular expression string which is used
- to provide a grouping hint to the test partitioner
+ :param group_callback: If supplied, should be a function that accepts a
+ test id and returns a group id. A group id is an arbitrary value
+ used as a dictionary key in the scheduler. All test ids with the
+ same group id are scheduled onto the same backend test process.
"""
self.test_ids = test_ids
self.template = cmd_template
@@ -180,8 +183,8 @@ class TestListingFixture(Fixture):
self._listpath = listpath
self._parser = parser
self.test_filters = test_filters
+ self._group_callback = group_callback
self._instance_source = instance_source
- self.group_regex = group_regex
def setUp(self):
super(TestListingFixture, self).setUp()
@@ -330,7 +333,6 @@ class TestListingFixture(Fixture):
:return: A list of spawned processes.
"""
result = []
- group_tags = None
test_ids = self.test_ids
if self.concurrency == 1 and (test_ids is None or test_ids):
# Have to customise cmd here, as instances are allocated
@@ -347,11 +349,8 @@ class TestListingFixture(Fixture):
return [CallWhenProcFinishes(run_proc,
lambda:self._instance_source.release_instance(instance))]
else:
- return [run_proc]
- if self.group_regex:
- group_tags = self.filter_test_groups(test_ids, self.group_regex)
- test_id_groups = self.partition_tests(test_ids, self.concurrency,
- group_tags)
+ return [run_proc]
+ test_id_groups = self.partition_tests(test_ids, self.concurrency)
for test_ids in test_id_groups:
if not test_ids:
# No tests in this partition
@@ -363,28 +362,7 @@ class TestListingFixture(Fixture):
result.extend(fixture.run_tests())
return result
- def filter_test_groups(self, test_ids, group_regex):
- """Add a group tag based on the regex provided
-
- :return A dict with the group tags as keys and a list of
- test ids that are a member of the group tag as the value
- """
-
- group_dict = {}
- expr = re.compile(group_regex)
- for test_id in test_ids:
- match = expr.match(test_id)
- if match:
- group_id = match.group(0)
- else:
- group_id = None
- if group_dict.get(group_id):
- group_dict[group_id].append(test_id)
- else:
- group_dict[group_id] = [test_id]
- return group_dict
-
- def partition_tests(self, test_ids, concurrency, group_tags=None):
+ def partition_tests(self, test_ids, concurrency):
"""Parition test_ids by concurrency.
Test durations from the repository are used to get partitions which
@@ -395,63 +373,59 @@ class TestListingFixture(Fixture):
:return: A list where each element is a distinct subset of test_ids,
and the union of all the elements is equal to set(test_ids).
"""
-
partitions = [list() for i in range(concurrency)]
timed_partitions = [[0.0, partition] for partition in partitions]
time_data = self.repository.get_test_times(test_ids)
- timed = time_data['known']
- unknown = time_data['unknown']
- # Schedule test groups by the sum of execute time for each test that is
- # a member of the group
- if group_tags:
- group_timed = {}
- group_unknown = []
- for group_tag in group_tags.keys():
- time = 0.0
- for test_id in group_tags[group_tag]:
- # If a test_id is not timed remove the whole group from the
- # timed groups dict and
- if test_id in unknown:
- if group_tag in group_timed.keys():
- group_timed.pop(group_tag, None)
- group_unknown.append(group_tag)
- break
- time = time + timed[test_id]
- group_timed[group_tag] = (group_tags[group_tag], time)
-
- queue = sorted(group_timed.items(),
- key=operator.itemgetter(1),
- reverse=True)
-
- # Sort the tests by runtime
- for group_tag, test_tuple in queue:
- test_ids = test_tuple[0]
- duration = test_tuple[1]
- timed_partitions[0][0] = timed_partitions[0][0] + duration
- # Handle groups larger than a single entry
- timed_partitions[0][1].extend(test_ids)
- timed_partitions.sort(key=lambda item: (item[0], len(item[1])))
- for partition, group_id in zip(itertools.cycle(partitions),
- group_unknown):
- partition = partition + group_tags[group_id]
- return partitions
-
+ timed_tests = time_data['known']
+ unknown_tests = time_data['unknown']
+ # Group tests: generate group_id -> test_ids.
+ group_ids = defaultdict(list)
+ if self._group_callback is None:
+ group_callback = lambda _:None
+ else:
+ group_callback = self._group_callback
+ for test_id in test_ids:
+ group_id = group_callback(test_id) or test_id
+ group_ids[group_id].append(test_id)
+ # Time groups: generate three sets of groups:
+ # - fully timed dict(group_id -> time),
+ # - partially timed dict(group_id -> time) and
+ # - unknown (set of group_id)
+ # We may in future treat partially timed different for scheduling, but
+ # at least today we just schedule them after the fully timed groups.
+ timed = {}
+ partial = {}
+ unknown = []
+ for group_id, group_tests in group_ids.items():
+ untimed_ids = unknown_tests.intersection(group_tests)
+ group_time = sum([timed_tests[test_id]
+ for test_id in untimed_ids.symmetric_difference(group_tests)])
+ if not untimed_ids:
+ timed[group_id] = group_time
+ elif group_time:
+ partial[group_id] = group_time
+ else:
+ unknown.append(group_id)
# Scheduling is NP complete in general, so we avoid aiming for
# perfection. A quick approximation that is sufficient for our general
# needs:
- # sort the tests by time
- # allocate to partitions by putting each test in to the partition with
- # the current (lowest time, shortest length)
- else:
- queue = sorted(timed.items(), key=operator.itemgetter(1), reverse=True)
- for test_id, duration in queue:
+ # sort the groups by time
+ # allocate to partitions by putting each group in to the partition with
+ # the current (lowest time, shortest length[in tests])
+ def consume_queue(groups):
+ queue = sorted(
+ groups.items(), key=operator.itemgetter(1), reverse=True)
+ for group_id, duration in queue:
timed_partitions[0][0] = timed_partitions[0][0] + duration
- timed_partitions[0][1].append(test_id)
+ timed_partitions[0][1].extend(group_ids[group_id])
timed_partitions.sort(key=lambda item:(item[0], len(item[1])))
- # Assign tests with unknown times in round robin fashion to the partitions.
- for partition, test_id in zip(itertools.cycle(partitions), unknown):
- partition.append(test_id)
- return partitions
+ consume_queue(timed)
+ consume_queue(partial)
+ # Assign groups with entirely unknown times in round robin fashion to
+ # the partitions.
+ for partition, group_id in zip(itertools.cycle(partitions), unknown):
+ partition.extend(group_ids[group_id])
+ return partitions
def callout_concurrency(self):
"""Callout for user defined concurrency."""