diff options
author | Tristan Van Berkom <tristan.vanberkom@codethink.co.uk> | 2018-09-10 14:40:06 +0900 |
---|---|---|
committer | Tristan Van Berkom <tristan.vanberkom@codethink.co.uk> | 2018-09-10 16:16:27 +0900 |
commit | 446654a2bb36a73d4f936b49e57bafa7de408d9d (patch) | |
tree | de9d6d9ce94c0b43399e836c02bebe04a50caf48 | |
parent | 0db410471850f9d89f2d33e23edb91bf03f03088 (diff) | |
download | buildstream-tristan/fix-cache-exclusivity.tar.gz |
_artifactcache: There shalt be only one cache sizetristan/fix-cache-exclusivity
This does a lot of house cleaning, finally bringing cache
cleanup logic to a level of comprehensibility.
Changes in this commit include:
o _artifactcache/artifactcache.py: _cache_size, _cache_quota and
_cache_lower_threshold are now all private variables.
get_approximate_cache_size() is now get_cache_size()
Added get_quota_exceeded() for the purpose of safely checking
if we have exceeded the quota.
set_cache_size() now asserts that the passed size is not None,
it is not acceptable to set a None size cache anymore.
o _artifactcache/cascache.py: No longer set the ArtifactCache
'cache_size' variable violently in the commit() method.
Also the calculate_cache_size() method now unconditionally
calculates the cache size, that is what it's for.
o _scheduler/jobs/cachesizejob.py & _scheduler/jobs/cleanupjob.py:
Now check the success status. Don't try to set the cache size
in the case that the job was terminated.
o _scheduler/jobs/elementjob.py & _scheduler/queues/queue.py:
No longer passing around the cache size from child tasks,
this happens only explicitly, not implicitly for all tasks.
o _scheduler/queues/buildqueue.py & _scheduler/scheduler.py:
Use get_quota_exceeded() accessor
This is a part of #623
-rw-r--r-- | buildstream/_artifactcache/artifactcache.py | 126 | ||||
-rw-r--r-- | buildstream/_artifactcache/cascache.py | 7 | ||||
-rw-r--r-- | buildstream/_scheduler/jobs/cachesizejob.py | 8 | ||||
-rw-r--r-- | buildstream/_scheduler/jobs/cleanupjob.py | 8 | ||||
-rw-r--r-- | buildstream/_scheduler/jobs/elementjob.py | 8 | ||||
-rw-r--r-- | buildstream/_scheduler/queues/buildqueue.py | 2 | ||||
-rw-r--r-- | buildstream/_scheduler/queues/queue.py | 5 | ||||
-rw-r--r-- | buildstream/_scheduler/scheduler.py | 6 |
8 files changed, 84 insertions, 86 deletions
diff --git a/buildstream/_artifactcache/artifactcache.py b/buildstream/_artifactcache/artifactcache.py index e9a11e23d..9ac68b102 100644 --- a/buildstream/_artifactcache/artifactcache.py +++ b/buildstream/_artifactcache/artifactcache.py @@ -87,12 +87,10 @@ class ArtifactCache(): self.global_remote_specs = [] self.project_remote_specs = {} - self.cache_size = None - self.cache_quota = None - self.cache_lower_threshold = None - - self._required_artifacts = set() - self._estimated_size = None + self._required_artifacts = set() # The artifacts required for this session + self._cache_size = None # The current cache size, sometimes it's an estimate + self._cache_quota = None # The cache quota + self._cache_lower_threshold = None # The target cache size for a cleanup os.makedirs(self.extractdir, exist_ok=True) os.makedirs(self.tmpdir, exist_ok=True) @@ -227,10 +225,16 @@ class ArtifactCache(): # # Clean the artifact cache as much as possible. # + # Returns: + # (int): The size of the cache after having cleaned up + # def clean(self): artifacts = self.list_artifacts() - while self.compute_cache_size() >= self.cache_quota - self.cache_lower_threshold: + # Do a real computation of the cache size once, just in case + self.compute_cache_size() + + while self.get_cache_size() >= self._cache_lower_threshold: try: to_remove = artifacts.pop(0) except IndexError: @@ -244,7 +248,7 @@ class ArtifactCache(): "Please increase the cache-quota in {}." .format(self.context.config_origin or default_conf)) - if self.compute_cache_size() > self.cache_quota: + if self.get_quota_exceeded(): raise ArtifactError("Cache too full. Aborting.", detail=detail, reason="cache-too-full") @@ -253,12 +257,15 @@ class ArtifactCache(): key = to_remove.rpartition('/')[2] if key not in self._required_artifacts: + + # Remove the actual artifact, if it's not required. size = self.remove(to_remove) - if size: - self.cache_size -= size + + # Remove the size from the removed size + self.set_cache_size(self._cache_size - size) # This should be O(1) if implemented correctly - return self.compute_cache_size() + return self.get_cache_size() # compute_cache_size() # @@ -269,60 +276,47 @@ class ArtifactCache(): # (int): The size of the artifact cache. # def compute_cache_size(self): - cache_size = self.calculate_cache_size() - - # Keep the estimated size updated here - self._estimated_size = cache_size + self._cache_size = self.calculate_cache_size() - return cache_size + return self._cache_size - # get_approximate_cache_size() + # add_artifact_size() # - # A cheap method that aims to serve as an upper limit on the - # artifact cache size. + # Adds the reported size of a newly cached artifact to the + # overall estimated size. # - # The cache size reported by this function will normally be larger - # than the real cache size, since it is calculated using the - # pre-commit artifact size, but for very small artifacts in - # certain caches additional overhead could cause this to be - # smaller than, but close to, the actual size. + # Args: + # artifact_size (int): The size to add. # - # Nonetheless, in practice this should be safe to use as an upper - # limit on the cache size. + def add_artifact_size(self, artifact_size): + cache_size = self.get_cache_size() + cache_size += artifact_size + + self.set_cache_size(cache_size) + + # get_cache_size() # - # If the cache has built-in constant-time size reporting, please - # feel free to override this method with a more accurate - # implementation. + # Fetches the cached size of the cache, this is sometimes + # an estimate and periodically adjusted to the real size + # when a cache size calculation job runs. + # + # When it is an estimate, the value is either correct, or + # it is greater than the actual cache size. # # Returns: # (int) An approximation of the artifact cache size. # - def get_approximate_cache_size(self): - # If we don't currently have an estimate, figure out the real - # cache size. - if self._estimated_size is None: + def get_cache_size(self): + + # If we don't currently have an estimate, figure out the real cache size. + if self._cache_size is None: stored_size = self._read_cache_size() if stored_size is not None: - self._estimated_size = stored_size + self._cache_size = stored_size else: self.compute_cache_size() - return self._estimated_size - - # add_artifact_size() - # - # Adds the reported size of a newly cached artifact to the - # overall estimated size. - # - # Args: - # artifact_size (int): The size to add. - # - def add_artifact_size(self, artifact_size): - if not self._estimated_size: - self.compute_cache_size() - - self._estimated_size += artifact_size - self._write_cache_size(self._estimated_size) + return self._cache_size # set_cache_size() # @@ -335,11 +329,21 @@ class ArtifactCache(): # cache_size (int): The size to set. # def set_cache_size(self, cache_size): - self._estimated_size = cache_size - # set_cache_size is called in cleanup, where it may set the cache to None - if self._estimated_size is not None: - self._write_cache_size(self._estimated_size) + assert cache_size is not None + + self._cache_size = cache_size + self._write_cache_size(self._cache_size) + + # get_quota_exceeded() + # + # Checks if the current artifact cache size exceeds the quota. + # + # Returns: + # (bool): True of the quota is exceeded + # + def get_quota_exceeded(self): + return self.get_cache_size() > self._cache_quota ################################################ # Abstract methods for subclasses to implement # @@ -583,6 +587,9 @@ class ArtifactCache(): # # Writes the given size of the artifact to the cache's size file # + # Args: + # size (int): The size of the artifact cache to record + # def _write_cache_size(self, size): assert isinstance(size, int) size_file_path = os.path.join(self.context.artifactdir, CACHE_SIZE_FILE) @@ -594,6 +601,9 @@ class ArtifactCache(): # Reads and returns the size of the artifact cache that's stored in the # cache's size file # + # Returns: + # (int): The size of the artifact cache, as recorded in the file + # def _read_cache_size(self): size_file_path = os.path.join(self.context.artifactdir, CACHE_SIZE_FILE) @@ -643,13 +653,13 @@ class ArtifactCache(): stat = os.statvfs(artifactdir_volume) available_space = (stat.f_bsize * stat.f_bavail) - cache_size = self.get_approximate_cache_size() + cache_size = self.get_cache_size() # Ensure system has enough storage for the cache_quota # # If cache_quota is none, set it to the maximum it could possibly be. # - # Also check that cache_quota is atleast as large as our headroom. + # Also check that cache_quota is at least as large as our headroom. # if cache_quota is None: # Infinity, set to max system storage cache_quota = cache_size + available_space @@ -675,8 +685,8 @@ class ArtifactCache(): # if we end up writing more than 2G, but hey, this stuff is # already really fuzzy. # - self.cache_quota = cache_quota - headroom - self.cache_lower_threshold = self.cache_quota / 2 + self._cache_quota = cache_quota - headroom + self._cache_lower_threshold = self._cache_quota / 2 # _configured_remote_artifact_cache_specs(): diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py index 099b252a5..4ef8723a3 100644 --- a/buildstream/_artifactcache/cascache.py +++ b/buildstream/_artifactcache/cascache.py @@ -120,8 +120,6 @@ class CASCache(ArtifactCache): for ref in refs: self.set_ref(ref, tree) - self.cache_size = None - def diff(self, element, key_a, key_b, *, subdir=None): ref_a = self.get_artifact_fullname(element, key_a) ref_b = self.get_artifact_fullname(element, key_b) @@ -488,10 +486,7 @@ class CASCache(ArtifactCache): raise ArtifactError("Attempt to access unavailable artifact: {}".format(e)) from e def calculate_cache_size(self): - if self.cache_size is None: - self.cache_size = utils._get_dir_size(self.casdir) - - return self.cache_size + return utils._get_dir_size(self.casdir) # list_artifacts(): # diff --git a/buildstream/_scheduler/jobs/cachesizejob.py b/buildstream/_scheduler/jobs/cachesizejob.py index 9c1a4bc97..68cd91331 100644 --- a/buildstream/_scheduler/jobs/cachesizejob.py +++ b/buildstream/_scheduler/jobs/cachesizejob.py @@ -32,9 +32,11 @@ class CacheSizeJob(Job): return self._artifacts.compute_cache_size() def parent_complete(self, success, result): - self._artifacts.set_cache_size(result) - if self._complete_cb: - self._complete_cb(result) + if success: + self._artifacts.set_cache_size(result) + + if self._complete_cb: + self._complete_cb(result) def child_process_data(self): return {} diff --git a/buildstream/_scheduler/jobs/cleanupjob.py b/buildstream/_scheduler/jobs/cleanupjob.py index 54d615952..c22ce3b98 100644 --- a/buildstream/_scheduler/jobs/cleanupjob.py +++ b/buildstream/_scheduler/jobs/cleanupjob.py @@ -32,9 +32,11 @@ class CleanupJob(Job): return self._artifacts.clean() def parent_complete(self, success, result): - self._artifacts.set_cache_size(result) - if self._complete_cb: - self._complete_cb() + if success: + self._artifacts.set_cache_size(result) + + if self._complete_cb: + self._complete_cb() def child_process_data(self): return {} diff --git a/buildstream/_scheduler/jobs/elementjob.py b/buildstream/_scheduler/jobs/elementjob.py index a010a6684..8ce5c062f 100644 --- a/buildstream/_scheduler/jobs/elementjob.py +++ b/buildstream/_scheduler/jobs/elementjob.py @@ -18,7 +18,6 @@ # from ruamel import yaml -from ..._platform import Platform from ..._message import Message, MessageType from .job import Job @@ -73,10 +72,6 @@ class ElementJob(Job): self._action_cb = action_cb # The action callable function self._complete_cb = complete_cb # The complete callable function - # Hold on to the artifact cache - platform = Platform.get_platform() - self._artifacts = platform.artifactcache - # Set the task wide ID for logging purposes self.set_task_id(element._get_unique_id()) @@ -114,10 +109,7 @@ class ElementJob(Job): data = {} workspace = self._element._get_workspace() - cache_size = self._artifacts.compute_cache_size() - if workspace is not None: data['workspace'] = workspace.to_dict() - data['cache_size'] = cache_size return data diff --git a/buildstream/_scheduler/queues/buildqueue.py b/buildstream/_scheduler/queues/buildqueue.py index 8902d9adf..6e7ce04aa 100644 --- a/buildstream/_scheduler/queues/buildqueue.py +++ b/buildstream/_scheduler/queues/buildqueue.py @@ -101,7 +101,7 @@ class BuildQueue(Queue): # If the estimated size outgrows the quota, ask the scheduler # to queue a job to actually check the real cache size. # - if artifacts.get_approximate_cache_size() > artifacts.cache_quota: + if artifacts.get_quota_exceeded(): self._scheduler.check_cache_size() def done(self, job, element, result, success): diff --git a/buildstream/_scheduler/queues/queue.py b/buildstream/_scheduler/queues/queue.py index 37602fb09..472e033da 100644 --- a/buildstream/_scheduler/queues/queue.py +++ b/buildstream/_scheduler/queues/queue.py @@ -31,7 +31,6 @@ from ..resources import ResourceType # BuildStream toplevel imports from ..._exceptions import BstError, set_last_task_error from ..._message import Message, MessageType -from ..._platform import Platform # Queue status for a given element @@ -302,10 +301,6 @@ class Queue(): # Update values that need to be synchronized in the main task # before calling any queue implementation self._update_workspaces(element, job) - if job.child_data: - platform = Platform.get_platform() - artifacts = platform.artifactcache - artifacts.cache_size = job.child_data.get('cache_size') # Give the result of the job to the Queue implementor, # and determine if it should be considered as processed diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py index 0cd13b42f..635b0628c 100644 --- a/buildstream/_scheduler/scheduler.py +++ b/buildstream/_scheduler/scheduler.py @@ -341,7 +341,7 @@ class Scheduler(): # exceeds the cache quota. # # Args: - # cache_size (int): The calculated cache size + # cache_size (int): The calculated cache size (ignored) # # NOTE: This runs in response to completion of the cache size # calculation job lauched by Scheduler.check_cache_size(), @@ -349,7 +349,9 @@ class Scheduler(): # def _run_cleanup(self, cache_size): platform = Platform.get_platform() - if cache_size and cache_size < platform.artifactcache.cache_quota: + artifacts = platform.artifactcache + + if not artifacts.get_quota_exceeded(): return job = CleanupJob(self, 'cleanup', 'cleanup/cleanup', |