diff options
-rw-r--r-- | buildstream/_frontend/widget.py | 30 | ||||
-rw-r--r-- | buildstream/_scheduler/resources.py | 2 | ||||
-rw-r--r-- | buildstream/_scheduler/scheduler.py | 65 | ||||
-rw-r--r-- | tests/artifactcache/expiry.py | 64 | ||||
-rw-r--r-- | tests/frontend/logging.py | 4 |
5 files changed, 141 insertions, 24 deletions
diff --git a/buildstream/_frontend/widget.py b/buildstream/_frontend/widget.py index 30c2e9e1a..c43856145 100644 --- a/buildstream/_frontend/widget.py +++ b/buildstream/_frontend/widget.py @@ -175,29 +175,22 @@ class TypeName(Widget): # A widget for displaying the Element name class ElementName(Widget): - def __init__(self, context, content_profile, format_profile): - super(ElementName, self).__init__(context, content_profile, format_profile) - - # Pre initialization format string, before we know the length of - # element names in the pipeline - self._fmt_string = '{: <30}' - def render(self, message): + action_name = message.action_name element_id = message.task_id or message.unique_id - if element_id is None: - return "" - - plugin = _plugin_lookup(element_id) - name = plugin._get_full_name() + if element_id is not None: + plugin = _plugin_lookup(element_id) + name = plugin._get_full_name() + name = '{: <30}'.format(name) + else: + name = 'core activity' + name = '{: <30}'.format(name) - # Sneak the action name in with the element name - action_name = message.action_name if not action_name: action_name = "Main" return self.content_profile.fmt("{: >5}".format(action_name.lower())) + \ - self.format_profile.fmt(':') + \ - self.content_profile.fmt(self._fmt_string.format(name)) + self.format_profile.fmt(':') + self.content_profile.fmt(name) # A widget for displaying the primary message text @@ -219,9 +212,12 @@ class CacheKey(Widget): def render(self, message): element_id = message.task_id or message.unique_id - if element_id is None or not self._key_length: + if not self._key_length: return "" + if element_id is None: + return ' ' * self._key_length + missing = False key = ' ' * self._key_length plugin = _plugin_lookup(element_id) diff --git a/buildstream/_scheduler/resources.py b/buildstream/_scheduler/resources.py index f19d66b44..73bf66b4a 100644 --- a/buildstream/_scheduler/resources.py +++ b/buildstream/_scheduler/resources.py @@ -163,4 +163,4 @@ class Resources(): def unregister_exclusive_interest(self, resources, source): for resource in resources: - self._exclusive_resources[resource].remove(source) + self._exclusive_resources[resource].discard(source) diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py index 9b688d1dd..7f5249575 100644 --- a/buildstream/_scheduler/scheduler.py +++ b/buildstream/_scheduler/scheduler.py @@ -40,8 +40,8 @@ class SchedStatus(): # Some action names for the internal jobs we launch # -_ACTION_NAME_CLEANUP = 'cleanup' -_ACTION_NAME_CACHE_SIZE = 'cache_size' +_ACTION_NAME_CLEANUP = 'clean' +_ACTION_NAME_CACHE_SIZE = 'size' # Scheduler() @@ -151,6 +151,9 @@ class Scheduler(): # Handle unix signals while running self._connect_signals() + # Check if we need to start with some cache maintenance + self._check_cache_management() + # Run the queues self._sched() self.loop.run_forever() @@ -272,6 +275,31 @@ class Scheduler(): # Local Private Methods # ####################################################### + # _check_cache_management() + # + # Run an initial check if we need to lock the cache + # resource and check the size and possibly launch + # a cleanup. + # + # Sessions which do not add to the cache are not affected. + # + def _check_cache_management(self): + + # Only trigger the check for a scheduler run which has + # queues which require the CACHE resource. + if not any(q for q in self.queues + if ResourceType.CACHE in q.resources): + return + + # If the estimated size outgrows the quota, queue a job to + # actually check the real cache size initially, this one + # should have exclusive access to the cache to ensure nothing + # starts while we are checking the cache. + # + artifacts = self.context.artifactcache + if artifacts.has_quota_exceeded(): + self._sched_cache_size_job(exclusive=True) + # _spawn_job() # # Spanws a job @@ -292,6 +320,11 @@ class Scheduler(): self._cache_size_running = None self.resources.release([ResourceType.CACHE, ResourceType.PROCESS]) + # Unregister the exclusive interest if there was any + self.resources.unregister_exclusive_interest( + [ResourceType.CACHE], 'cache-size' + ) + # Schedule a cleanup job if we've hit the threshold if status != JobStatus.OK: return @@ -344,11 +377,35 @@ class Scheduler(): # Runs a cache size job if one is scheduled to run now and # sufficient recources are available. # - def _sched_cache_size_job(self): + # Args: + # exclusive (bool): Run a cache size job immediately and + # hold the ResourceType.CACHE resource + # exclusively (used at startup). + # + def _sched_cache_size_job(self, *, exclusive=False): + + # The exclusive argument is not intended (or safe) for arbitrary use. + if exclusive: + assert not self._cache_size_scheduled + assert not self._cache_size_running + assert not self._active_jobs + self._cache_size_scheduled = True if self._cache_size_scheduled and not self._cache_size_running: - if self.resources.reserve([ResourceType.CACHE, ResourceType.PROCESS]): + # Handle the exclusive launch + exclusive_resources = set() + if exclusive: + exclusive_resources.add(ResourceType.CACHE) + self.resources.register_exclusive_interest( + exclusive_resources, 'cache-size' + ) + + # Reserve the resources (with the possible exclusive cache resource) + if self.resources.reserve([ResourceType.CACHE, ResourceType.PROCESS], + exclusive_resources): + + # Update state and launch self._cache_size_scheduled = False self._cache_size_running = \ CacheSizeJob(self, _ACTION_NAME_CACHE_SIZE, diff --git a/tests/artifactcache/expiry.py b/tests/artifactcache/expiry.py index 2bdca0d12..d7bafe7e8 100644 --- a/tests/artifactcache/expiry.py +++ b/tests/artifactcache/expiry.py @@ -18,6 +18,7 @@ # import os +import re from unittest import mock import pytest @@ -425,3 +426,66 @@ def test_extract_expiry(cli, datafiles, tmpdir): assert os.path.isdir(refsdirtarget2) assert not os.path.exists(refsdirtarget) + + +# Ensures that when launching BuildStream with a full artifact cache, +# the cache size and cleanup jobs are run before any other jobs. +# +@pytest.mark.datafiles(DATA_DIR) +def test_cleanup_first(cli, datafiles, tmpdir): + project = os.path.join(datafiles.dirname, datafiles.basename) + element_path = 'elements' + cache_location = os.path.join(project, 'cache', 'artifacts', 'ostree') + checkout = os.path.join(project, 'checkout') + + cli.configure({ + 'cache': { + 'quota': 10000000, + } + }) + + # Create an element that uses almost the entire cache (an empty + # ostree cache starts at about ~10KiB, so we need a bit of a + # buffer) + create_element_size('target.bst', project, element_path, [], 8000000) + res = cli.run(project=project, args=['build', 'target.bst']) + res.assert_success() + + assert cli.get_element_state(project, 'target.bst') == 'cached' + + # Now configure with a smaller quota, create a situation + # where the cache must be cleaned up before building anything else. + # + # Fix the fetchers and builders just to ensure a predictable + # sequence of events (although it does not effect this test) + cli.configure({ + 'cache': { + 'quota': 5000000, + }, + 'scheduler': { + 'fetchers': 1, + 'builders': 1 + } + }) + + # Our cache is now more than full, BuildStream + create_element_size('target2.bst', project, element_path, [], 4000000) + res = cli.run(project=project, args=['build', 'target2.bst']) + res.assert_success() + + # Find all of the activity (like push, pull, fetch) lines + results = re.findall(r'\[.*\]\[.*\]\[\s*(\S+):.*\]\s*START\s*.*\.log', res.stderr) + + # Don't bother checking the order of 'fetch', it is allowed to start + # before or after the initial cache size job, runs in parallel, and does + # not require ResourceType.CACHE. + results.remove('fetch') + print(results) + + # Assert the expected sequence of events + assert results == ['size', 'clean', 'build'] + + # Check that the correct element remains in the cache + states = cli.get_element_states(project, ['target.bst', 'target2.bst']) + assert states['target.bst'] != 'cached' + assert states['target2.bst'] == 'cached' diff --git a/tests/frontend/logging.py b/tests/frontend/logging.py index a10f62cc1..3243e74bc 100644 --- a/tests/frontend/logging.py +++ b/tests/frontend/logging.py @@ -41,7 +41,7 @@ def test_default_logging(cli, tmpdir, datafiles): result = cli.run(project=project, args=['source', 'fetch', element_name]) result.assert_success() - m = re.search(r"\[\d\d:\d\d:\d\d\]\[\]\[\] SUCCESS Checking sources", result.stderr) + m = re.search(r"\[\d\d:\d\d:\d\d\]\[\s*\]\[.*\] SUCCESS Checking sources", result.stderr) assert(m is not None) @@ -77,7 +77,7 @@ def test_custom_logging(cli, tmpdir, datafiles): result = cli.run(project=project, args=['source', 'fetch', element_name]) result.assert_success() - m = re.search(r"\d\d:\d\d:\d\d,\d\d:\d\d:\d\d.\d{6},\d\d:\d\d:\d\d,,,SUCCESS,Checking sources", result.stderr) + m = re.search(r"\d\d:\d\d:\d\d,\d\d:\d\d:\d\d.\d{6},\d\d:\d\d:\d\d,\s*,.*,SUCCESS,Checking sources", result.stderr) assert(m is not None) |