summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTristan Van Berkom <tristan.van.berkom@gmail.com>2019-01-24 18:01:41 +0000
committerTristan Van Berkom <tristan.van.berkom@gmail.com>2019-01-24 18:01:41 +0000
commitacd0bf224cb447b3c650da22d4e3b55964a87521 (patch)
tree0d54aa8915c516b5e2c31fab2d520f7267cb8c0f
parent05587f22bd157e188e2083f11893e7c801b3f50e (diff)
parentfdb8ff650e4c1dd274463a7cb2ef5708bcd096b7 (diff)
downloadbuildstream-acd0bf224cb447b3c650da22d4e3b55964a87521.tar.gz
Merge branch 'tristan/cache-management' into 'master'
Cache management fixes See merge request BuildStream/buildstream!1091
-rw-r--r--buildstream/_frontend/widget.py30
-rw-r--r--buildstream/_scheduler/resources.py2
-rw-r--r--buildstream/_scheduler/scheduler.py65
-rw-r--r--tests/artifactcache/expiry.py64
-rw-r--r--tests/frontend/logging.py4
5 files changed, 141 insertions, 24 deletions
diff --git a/buildstream/_frontend/widget.py b/buildstream/_frontend/widget.py
index 30c2e9e1a..c43856145 100644
--- a/buildstream/_frontend/widget.py
+++ b/buildstream/_frontend/widget.py
@@ -175,29 +175,22 @@ class TypeName(Widget):
# A widget for displaying the Element name
class ElementName(Widget):
- def __init__(self, context, content_profile, format_profile):
- super(ElementName, self).__init__(context, content_profile, format_profile)
-
- # Pre initialization format string, before we know the length of
- # element names in the pipeline
- self._fmt_string = '{: <30}'
-
def render(self, message):
+ action_name = message.action_name
element_id = message.task_id or message.unique_id
- if element_id is None:
- return ""
-
- plugin = _plugin_lookup(element_id)
- name = plugin._get_full_name()
+ if element_id is not None:
+ plugin = _plugin_lookup(element_id)
+ name = plugin._get_full_name()
+ name = '{: <30}'.format(name)
+ else:
+ name = 'core activity'
+ name = '{: <30}'.format(name)
- # Sneak the action name in with the element name
- action_name = message.action_name
if not action_name:
action_name = "Main"
return self.content_profile.fmt("{: >5}".format(action_name.lower())) + \
- self.format_profile.fmt(':') + \
- self.content_profile.fmt(self._fmt_string.format(name))
+ self.format_profile.fmt(':') + self.content_profile.fmt(name)
# A widget for displaying the primary message text
@@ -219,9 +212,12 @@ class CacheKey(Widget):
def render(self, message):
element_id = message.task_id or message.unique_id
- if element_id is None or not self._key_length:
+ if not self._key_length:
return ""
+ if element_id is None:
+ return ' ' * self._key_length
+
missing = False
key = ' ' * self._key_length
plugin = _plugin_lookup(element_id)
diff --git a/buildstream/_scheduler/resources.py b/buildstream/_scheduler/resources.py
index f19d66b44..73bf66b4a 100644
--- a/buildstream/_scheduler/resources.py
+++ b/buildstream/_scheduler/resources.py
@@ -163,4 +163,4 @@ class Resources():
def unregister_exclusive_interest(self, resources, source):
for resource in resources:
- self._exclusive_resources[resource].remove(source)
+ self._exclusive_resources[resource].discard(source)
diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py
index 9b688d1dd..7f5249575 100644
--- a/buildstream/_scheduler/scheduler.py
+++ b/buildstream/_scheduler/scheduler.py
@@ -40,8 +40,8 @@ class SchedStatus():
# Some action names for the internal jobs we launch
#
-_ACTION_NAME_CLEANUP = 'cleanup'
-_ACTION_NAME_CACHE_SIZE = 'cache_size'
+_ACTION_NAME_CLEANUP = 'clean'
+_ACTION_NAME_CACHE_SIZE = 'size'
# Scheduler()
@@ -151,6 +151,9 @@ class Scheduler():
# Handle unix signals while running
self._connect_signals()
+ # Check if we need to start with some cache maintenance
+ self._check_cache_management()
+
# Run the queues
self._sched()
self.loop.run_forever()
@@ -272,6 +275,31 @@ class Scheduler():
# Local Private Methods #
#######################################################
+ # _check_cache_management()
+ #
+ # Run an initial check if we need to lock the cache
+ # resource and check the size and possibly launch
+ # a cleanup.
+ #
+ # Sessions which do not add to the cache are not affected.
+ #
+ def _check_cache_management(self):
+
+ # Only trigger the check for a scheduler run which has
+ # queues which require the CACHE resource.
+ if not any(q for q in self.queues
+ if ResourceType.CACHE in q.resources):
+ return
+
+ # If the estimated size outgrows the quota, queue a job to
+ # actually check the real cache size initially, this one
+ # should have exclusive access to the cache to ensure nothing
+ # starts while we are checking the cache.
+ #
+ artifacts = self.context.artifactcache
+ if artifacts.has_quota_exceeded():
+ self._sched_cache_size_job(exclusive=True)
+
# _spawn_job()
#
# Spanws a job
@@ -292,6 +320,11 @@ class Scheduler():
self._cache_size_running = None
self.resources.release([ResourceType.CACHE, ResourceType.PROCESS])
+ # Unregister the exclusive interest if there was any
+ self.resources.unregister_exclusive_interest(
+ [ResourceType.CACHE], 'cache-size'
+ )
+
# Schedule a cleanup job if we've hit the threshold
if status != JobStatus.OK:
return
@@ -344,11 +377,35 @@ class Scheduler():
# Runs a cache size job if one is scheduled to run now and
# sufficient recources are available.
#
- def _sched_cache_size_job(self):
+ # Args:
+ # exclusive (bool): Run a cache size job immediately and
+ # hold the ResourceType.CACHE resource
+ # exclusively (used at startup).
+ #
+ def _sched_cache_size_job(self, *, exclusive=False):
+
+ # The exclusive argument is not intended (or safe) for arbitrary use.
+ if exclusive:
+ assert not self._cache_size_scheduled
+ assert not self._cache_size_running
+ assert not self._active_jobs
+ self._cache_size_scheduled = True
if self._cache_size_scheduled and not self._cache_size_running:
- if self.resources.reserve([ResourceType.CACHE, ResourceType.PROCESS]):
+ # Handle the exclusive launch
+ exclusive_resources = set()
+ if exclusive:
+ exclusive_resources.add(ResourceType.CACHE)
+ self.resources.register_exclusive_interest(
+ exclusive_resources, 'cache-size'
+ )
+
+ # Reserve the resources (with the possible exclusive cache resource)
+ if self.resources.reserve([ResourceType.CACHE, ResourceType.PROCESS],
+ exclusive_resources):
+
+ # Update state and launch
self._cache_size_scheduled = False
self._cache_size_running = \
CacheSizeJob(self, _ACTION_NAME_CACHE_SIZE,
diff --git a/tests/artifactcache/expiry.py b/tests/artifactcache/expiry.py
index 2bdca0d12..d7bafe7e8 100644
--- a/tests/artifactcache/expiry.py
+++ b/tests/artifactcache/expiry.py
@@ -18,6 +18,7 @@
#
import os
+import re
from unittest import mock
import pytest
@@ -425,3 +426,66 @@ def test_extract_expiry(cli, datafiles, tmpdir):
assert os.path.isdir(refsdirtarget2)
assert not os.path.exists(refsdirtarget)
+
+
+# Ensures that when launching BuildStream with a full artifact cache,
+# the cache size and cleanup jobs are run before any other jobs.
+#
+@pytest.mark.datafiles(DATA_DIR)
+def test_cleanup_first(cli, datafiles, tmpdir):
+ project = os.path.join(datafiles.dirname, datafiles.basename)
+ element_path = 'elements'
+ cache_location = os.path.join(project, 'cache', 'artifacts', 'ostree')
+ checkout = os.path.join(project, 'checkout')
+
+ cli.configure({
+ 'cache': {
+ 'quota': 10000000,
+ }
+ })
+
+ # Create an element that uses almost the entire cache (an empty
+ # ostree cache starts at about ~10KiB, so we need a bit of a
+ # buffer)
+ create_element_size('target.bst', project, element_path, [], 8000000)
+ res = cli.run(project=project, args=['build', 'target.bst'])
+ res.assert_success()
+
+ assert cli.get_element_state(project, 'target.bst') == 'cached'
+
+ # Now configure with a smaller quota, create a situation
+ # where the cache must be cleaned up before building anything else.
+ #
+ # Fix the fetchers and builders just to ensure a predictable
+ # sequence of events (although it does not effect this test)
+ cli.configure({
+ 'cache': {
+ 'quota': 5000000,
+ },
+ 'scheduler': {
+ 'fetchers': 1,
+ 'builders': 1
+ }
+ })
+
+ # Our cache is now more than full, BuildStream
+ create_element_size('target2.bst', project, element_path, [], 4000000)
+ res = cli.run(project=project, args=['build', 'target2.bst'])
+ res.assert_success()
+
+ # Find all of the activity (like push, pull, fetch) lines
+ results = re.findall(r'\[.*\]\[.*\]\[\s*(\S+):.*\]\s*START\s*.*\.log', res.stderr)
+
+ # Don't bother checking the order of 'fetch', it is allowed to start
+ # before or after the initial cache size job, runs in parallel, and does
+ # not require ResourceType.CACHE.
+ results.remove('fetch')
+ print(results)
+
+ # Assert the expected sequence of events
+ assert results == ['size', 'clean', 'build']
+
+ # Check that the correct element remains in the cache
+ states = cli.get_element_states(project, ['target.bst', 'target2.bst'])
+ assert states['target.bst'] != 'cached'
+ assert states['target2.bst'] == 'cached'
diff --git a/tests/frontend/logging.py b/tests/frontend/logging.py
index a10f62cc1..3243e74bc 100644
--- a/tests/frontend/logging.py
+++ b/tests/frontend/logging.py
@@ -41,7 +41,7 @@ def test_default_logging(cli, tmpdir, datafiles):
result = cli.run(project=project, args=['source', 'fetch', element_name])
result.assert_success()
- m = re.search(r"\[\d\d:\d\d:\d\d\]\[\]\[\] SUCCESS Checking sources", result.stderr)
+ m = re.search(r"\[\d\d:\d\d:\d\d\]\[\s*\]\[.*\] SUCCESS Checking sources", result.stderr)
assert(m is not None)
@@ -77,7 +77,7 @@ def test_custom_logging(cli, tmpdir, datafiles):
result = cli.run(project=project, args=['source', 'fetch', element_name])
result.assert_success()
- m = re.search(r"\d\d:\d\d:\d\d,\d\d:\d\d:\d\d.\d{6},\d\d:\d\d:\d\d,,,SUCCESS,Checking sources", result.stderr)
+ m = re.search(r"\d\d:\d\d:\d\d,\d\d:\d\d:\d\d.\d{6},\d\d:\d\d:\d\d,\s*,.*,SUCCESS,Checking sources", result.stderr)
assert(m is not None)