summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTristan Maat <tristan.maat@codethink.co.uk>2018-07-17 13:05:51 +0100
committerTristan Van Berkom <tristan.vanberkom@codethink.co.uk>2018-07-18 15:07:24 +0900
commitdc17de387961ba54c4742ba504ba7314c7da16aa (patch)
treec7f225c3586ee02419a4be03e288bd604af1020d
parent7229d2e5d03227f174ac359fd1a73ef50c071c5a (diff)
downloadbuildstream-dc17de387961ba54c4742ba504ba7314c7da16aa.tar.gz
Clean the artifact cache when we hit the cache quota
When the cache quota is hit, we will remove any elements not required for the current build, until our cache is only filled halfway.
-rw-r--r--buildstream/_artifactcache/artifactcache.py110
-rw-r--r--buildstream/_artifactcache/cascache.py6
-rw-r--r--buildstream/_scheduler/jobs/__init__.py1
-rw-r--r--buildstream/_scheduler/jobs/cleanupjob.py63
-rw-r--r--buildstream/_scheduler/scheduler.py16
-rw-r--r--buildstream/_stream.py8
6 files changed, 201 insertions, 3 deletions
diff --git a/buildstream/_artifactcache/artifactcache.py b/buildstream/_artifactcache/artifactcache.py
index 9abe68cd0..5feae93f4 100644
--- a/buildstream/_artifactcache/artifactcache.py
+++ b/buildstream/_artifactcache/artifactcache.py
@@ -21,7 +21,8 @@ import os
import string
from collections import Mapping, namedtuple
-from .._exceptions import ImplError, LoadError, LoadErrorReason
+from ..element import _KeyStrength
+from .._exceptions import ArtifactError, ImplError, LoadError, LoadErrorReason
from .._message import Message, MessageType
from .. import utils
from .. import _yaml
@@ -77,6 +78,7 @@ ArtifactCacheSpec.__new__.__defaults__ = (None, None, None)
class ArtifactCache():
def __init__(self, context):
self.context = context
+ self.required_artifacts = set()
self.extractdir = os.path.join(context.artifactdir, 'extract')
self.max_size = context.cache_quota
self.estimated_size = None
@@ -183,6 +185,75 @@ class ArtifactCache():
(str(provenance)))
return cache_specs
+ # append_required_artifacts():
+ #
+ # Append to the list of elements whose artifacts are required for
+ # the current run. Artifacts whose elements are in this list will
+ # be locked by the artifact cache and not touched for the duration
+ # of the current pipeline.
+ #
+ # Args:
+ # elements (iterable): A set of elements to mark as required
+ #
+ def append_required_artifacts(self, elements):
+ # We lock both strong and weak keys - deleting one but not the
+ # other won't save space in most cases anyway, but would be a
+ # user inconvenience.
+
+ for element in elements:
+ strong_key = element._get_cache_key(strength=_KeyStrength.STRONG)
+ weak_key = element._get_cache_key(strength=_KeyStrength.WEAK)
+
+ for key in (strong_key, weak_key):
+ if key and key not in self.required_artifacts:
+ self.required_artifacts.add(key)
+
+ # We also update the usage times of any artifacts
+ # we will be using, which helps preventing a
+ # buildstream process that runs in parallel with
+ # this one from removing artifacts in-use.
+ try:
+ self.update_atime(key)
+ except ArtifactError:
+ pass
+
+ # clean():
+ #
+ # Clean the artifact cache as much as possible.
+ #
+ def clean(self):
+ artifacts = self.list_artifacts()
+
+ while self.calculate_cache_size() >= self.context.cache_quota - self.context.cache_lower_threshold:
+ try:
+ to_remove = artifacts.pop(0)
+ except IndexError:
+ # If too many artifacts are required, and we therefore
+ # can't remove them, we have to abort the build.
+ #
+ # FIXME: Asking the user what to do may be neater
+ default_conf = os.path.join(os.environ['XDG_CONFIG_HOME'],
+ 'buildstream.conf')
+ detail = ("There is not enough space to build the given element.\n"
+ "Please increase the cache-quota in {}."
+ .format(self.context.config_origin or default_conf))
+
+ if self.calculate_cache_size() > self.context.cache_quota:
+ raise ArtifactError("Cache too full. Aborting.",
+ detail=detail,
+ reason="cache-too-full")
+ else:
+ break
+
+ key = to_remove.rpartition('/')[2]
+ if key not in self.required_artifacts:
+ size = self.remove(to_remove)
+ if size:
+ self.cache_size -= size
+
+ # This should be O(1) if implemented correctly
+ return self.calculate_cache_size()
+
# get_approximate_cache_size()
#
# A cheap method that aims to serve as an upper limit on the
@@ -216,6 +287,17 @@ class ArtifactCache():
# Abstract methods for subclasses to implement #
################################################
+ # update_atime()
+ #
+ # Update the atime of an artifact.
+ #
+ # Args:
+ # key (str): The key of the artifact.
+ #
+ def update_atime(self, key):
+ raise ImplError("Cache '{kind}' does not implement contains()"
+ .format(kind=type(self).__name__))
+
# initialize_remotes():
#
# This will contact each remote cache.
@@ -241,6 +323,32 @@ class ArtifactCache():
raise ImplError("Cache '{kind}' does not implement contains()"
.format(kind=type(self).__name__))
+ # list_artifacts():
+ #
+ # List artifacts in this cache in LRU order.
+ #
+ # Returns:
+ # ([str]) - A list of artifact names as generated by
+ # `ArtifactCache.get_artifact_fullname` in LRU order
+ #
+ def list_artifacts(self):
+ raise ImplError("Cache '{kind}' does not implement list_artifacts()"
+ .format(kind=type(self).__name__))
+
+ # remove():
+ #
+ # Removes the artifact for the specified ref from the local
+ # artifact cache.
+ #
+ # Args:
+ # ref (artifact_name): The name of the artifact to remove (as
+ # generated by
+ # `ArtifactCache.get_artifact_fullname`)
+ #
+ def remove(self, artifact_name):
+ raise ImplError("Cache '{kind}' does not implement remove()"
+ .format(kind=type(self).__name__))
+
# extract():
#
# Extract cached artifact for the specified Element if it hasn't
diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py
index 4f1d8ac18..1e84c282f 100644
--- a/buildstream/_artifactcache/cascache.py
+++ b/buildstream/_artifactcache/cascache.py
@@ -450,6 +450,12 @@ class CASCache(ArtifactCache):
except FileNotFoundError as e:
raise ArtifactError("Attempt to access unavailable artifact: {}".format(e)) from e
+ def update_atime(self, ref):
+ try:
+ os.utime(self._refpath(ref))
+ except FileNotFoundError as e:
+ raise ArtifactError("Attempt to access unavailable artifact: {}".format(e)) from e
+
def calculate_cache_size(self):
if self.cache_size is None:
self.cache_size = utils._get_dir_size(self.casdir)
diff --git a/buildstream/_scheduler/jobs/__init__.py b/buildstream/_scheduler/jobs/__init__.py
index 981558641..185d8258a 100644
--- a/buildstream/_scheduler/jobs/__init__.py
+++ b/buildstream/_scheduler/jobs/__init__.py
@@ -1,2 +1,3 @@
from .elementjob import ElementJob
from .cachesizejob import CacheSizeJob
+from .cleanupjob import CleanupJob
diff --git a/buildstream/_scheduler/jobs/cleanupjob.py b/buildstream/_scheduler/jobs/cleanupjob.py
new file mode 100644
index 000000000..af7ca2b0b
--- /dev/null
+++ b/buildstream/_scheduler/jobs/cleanupjob.py
@@ -0,0 +1,63 @@
+# Copyright (C) 2018 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+# Author:
+# Tristan Daniƫl Maat <tristan.maat@codethink.co.uk>
+#
+import os
+from contextlib import contextmanager
+
+from .job import Job
+from ..._platform import Platform
+from ..._message import Message
+
+
+class CleanupJob(Job):
+ def __init__(self, *args, complete_cb, **kwargs):
+ super().__init__(*args, **kwargs)
+ self._complete_cb = complete_cb
+ self._cache = Platform._instance.artifactcache
+
+ def child_process(self):
+ return self._cache.clean()
+
+ def parent_complete(self, success, result):
+ self._cache._set_cache_size(result)
+ if self._complete_cb:
+ self._complete_cb()
+
+ @contextmanager
+ def child_logging_enabled(self, logfile):
+ self._logfile = logfile.format(pid=os.getpid())
+ yield self._logfile
+ self._logfile = None
+
+ def message(self, message_type, message, **kwargs):
+ args = dict(kwargs)
+ args['scheduler'] = True
+ self._scheduler.context.message(Message(None, message_type, message, **args))
+
+ def child_log(self, message):
+ message.action_name = self.action_name
+
+ with open(self._logfile, 'a+') as log:
+ message_text = self.decorate_message(message, '[cleanup]')
+ log.write('{}\n'.format(message_text))
+ log.flush()
+
+ return message
+
+ def child_process_data(self):
+ return {}
diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py
index a11134cc8..aeb32931a 100644
--- a/buildstream/_scheduler/scheduler.py
+++ b/buildstream/_scheduler/scheduler.py
@@ -28,7 +28,7 @@ from contextlib import contextmanager
# Local imports
from .resources import Resources, ResourceType
-from .jobs import CacheSizeJob
+from .jobs import CacheSizeJob, CleanupJob
# A decent return code for Scheduler.run()
@@ -313,13 +313,25 @@ class Scheduler():
self.schedule_jobs(ready)
self._sched()
+ def _run_cleanup(self, cache_size):
+ if cache_size and cache_size < self.context.cache_quota:
+ return
+
+ logpath = os.path.join(self.context.logdir, 'cleanup.{pid}.log')
+ job = CleanupJob(self, 'cleanup', logpath,
+ resources=[ResourceType.CACHE,
+ ResourceType.PROCESS],
+ exclusive_resources=[ResourceType.CACHE],
+ complete_cb=None)
+ self.schedule_jobs([job])
+
def _check_cache_size_real(self):
logpath = os.path.join(self.context.logdir, 'cache_size.{pid}.log')
job = CacheSizeJob(self, 'cache_size', logpath,
resources=[ResourceType.CACHE,
ResourceType.PROCESS],
exclusive_resources=[ResourceType.CACHE],
- complete_cb=None)
+ complete_cb=self._run_cleanup)
self.schedule_jobs([job])
# _suspend_jobs()
diff --git a/buildstream/_stream.py b/buildstream/_stream.py
index a29d8b63f..34baa0b05 100644
--- a/buildstream/_stream.py
+++ b/buildstream/_stream.py
@@ -876,6 +876,14 @@ class Stream():
selected,
except_elements)
+ # Set the "required" artifacts that should not be removed
+ # while this pipeline is active
+ #
+ # FIXME: The set of required artifacts should probably be
+ # what's in `selected`, but this does not seem to work
+ # for some reason
+ self._artifacts.append_required_artifacts((e for e in self._pipeline.dependencies(elements, Scope.ALL)))
+
if selection == PipelineSelection.PLAN and dynamic_plan:
# We use a dynamic build plan, only request artifacts of top-level targets,
# others are requested dynamically as needed.