summaryrefslogtreecommitdiff
path: root/testrepository/testcommand.py
diff options
context:
space:
mode:
Diffstat (limited to 'testrepository/testcommand.py')
-rw-r--r--testrepository/testcommand.py67
1 files changed, 53 insertions, 14 deletions
diff --git a/testrepository/testcommand.py b/testrepository/testcommand.py
index ef285ab..d93881d 100644
--- a/testrepository/testcommand.py
+++ b/testrepository/testcommand.py
@@ -16,6 +16,7 @@
from extras import try_imports
+from collections import defaultdict
ConfigParser = try_imports(['ConfigParser', 'configparser'])
import itertools
import operator
@@ -134,7 +135,7 @@ class TestListingFixture(Fixture):
def __init__(self, test_ids, cmd_template, listopt, idoption, ui,
repository, parallel=True, listpath=None, parser=None,
- test_filters=None, instance_source=None):
+ test_filters=None, instance_source=None, group_callback=None):
"""Create a TestListingFixture.
:param test_ids: The test_ids to use. May be None indicating that
@@ -167,6 +168,10 @@ class TestListingFixture(Fixture):
:param instance_source: A source of test run instances. Must support
obtain_instance(max_concurrency) -> id and release_instance(id)
calls.
+ :param group_callback: If supplied, should be a function that accepts a
+ test id and returns a group id. A group id is an arbitrary value
+ used as a dictionary key in the scheduler. All test ids with the
+ same group id are scheduled onto the same backend test process.
"""
self.test_ids = test_ids
self.template = cmd_template
@@ -178,6 +183,7 @@ class TestListingFixture(Fixture):
self._listpath = listpath
self._parser = parser
self.test_filters = test_filters
+ self._group_callback = group_callback
self._instance_source = instance_source
def setUp(self):
@@ -370,22 +376,55 @@ class TestListingFixture(Fixture):
partitions = [list() for i in range(concurrency)]
timed_partitions = [[0.0, partition] for partition in partitions]
time_data = self.repository.get_test_times(test_ids)
- timed = time_data['known']
- unknown = time_data['unknown']
+ timed_tests = time_data['known']
+ unknown_tests = time_data['unknown']
+ # Group tests: generate group_id -> test_ids.
+ group_ids = defaultdict(list)
+ if self._group_callback is None:
+ group_callback = lambda _:None
+ else:
+ group_callback = self._group_callback
+ for test_id in test_ids:
+ group_id = group_callback(test_id) or test_id
+ group_ids[group_id].append(test_id)
+ # Time groups: generate three sets of groups:
+ # - fully timed dict(group_id -> time),
+ # - partially timed dict(group_id -> time) and
+ # - unknown (set of group_id)
+ # We may in future treat partially timed different for scheduling, but
+ # at least today we just schedule them after the fully timed groups.
+ timed = {}
+ partial = {}
+ unknown = []
+ for group_id, group_tests in group_ids.items():
+ untimed_ids = unknown_tests.intersection(group_tests)
+ group_time = sum([timed_tests[test_id]
+ for test_id in untimed_ids.symmetric_difference(group_tests)])
+ if not untimed_ids:
+ timed[group_id] = group_time
+ elif group_time:
+ partial[group_id] = group_time
+ else:
+ unknown.append(group_id)
# Scheduling is NP complete in general, so we avoid aiming for
# perfection. A quick approximation that is sufficient for our general
# needs:
- # sort the tests by time
- # allocate to partitions by putting each test in to the partition with
- # the current (lowest time, shortest length)
- queue = sorted(timed.items(), key=operator.itemgetter(1), reverse=True)
- for test_id, duration in queue:
- timed_partitions[0][0] = timed_partitions[0][0] + duration
- timed_partitions[0][1].append(test_id)
- timed_partitions.sort(key=lambda item:(item[0], len(item[1])))
- # Assign tests with unknown times in round robin fashion to the partitions.
- for partition, test_id in zip(itertools.cycle(partitions), unknown):
- partition.append(test_id)
+ # sort the groups by time
+ # allocate to partitions by putting each group in to the partition with
+ # the current (lowest time, shortest length[in tests])
+ def consume_queue(groups):
+ queue = sorted(
+ groups.items(), key=operator.itemgetter(1), reverse=True)
+ for group_id, duration in queue:
+ timed_partitions[0][0] = timed_partitions[0][0] + duration
+ timed_partitions[0][1].extend(group_ids[group_id])
+ timed_partitions.sort(key=lambda item:(item[0], len(item[1])))
+ consume_queue(timed)
+ consume_queue(partial)
+ # Assign groups with entirely unknown times in round robin fashion to
+ # the partitions.
+ for partition, group_id in zip(itertools.cycle(partitions), unknown):
+ partition.extend(group_ids[group_id])
return partitions
def callout_concurrency(self):