summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTristan Van Berkom <tristan.vanberkom@codethink.co.uk>2018-04-10 19:31:18 +0900
committerTristan Van Berkom <tristan.vanberkom@codethink.co.uk>2018-04-10 21:11:54 +0900
commita9cbb0f7074286f5323954d327bc923c0028e7c0 (patch)
tree03a7dd39a6e89f2c633aacff46fafa689792544a
parent0b5809c4b5901df60fa6a3d9e1aa12b9c13872ee (diff)
downloadbuildstream-a9cbb0f7074286f5323954d327bc923c0028e7c0.tar.gz
Serialize workspace modifications in the main process.
This patch makes the child process send any workspace state back to the main process and saves it there directly before processing the result of a build job, such that workspace modifications in child tasks work transparently and no special care needs to be taken to save them except when doing so in the main process. Also this removes the line where we update workspace data at staging time. This fixes issue #352
-rw-r--r--buildstream/_scheduler/job.py39
-rw-r--r--buildstream/_scheduler/queue.py18
-rw-r--r--buildstream/element.py26
3 files changed, 64 insertions, 19 deletions
diff --git a/buildstream/_scheduler/job.py b/buildstream/_scheduler/job.py
index 5d2e22646..5de6fae9e 100644
--- a/buildstream/_scheduler/job.py
+++ b/buildstream/_scheduler/job.py
@@ -77,6 +77,7 @@ class Job():
# Only relevant in parent process after spawning
self.pid = None # The child's pid in the parent
self.result = None # Return value of child action in the parent
+ self.workspace_dict = None # A serialized Workspace object, after any modifications
self.tries = 0
@@ -262,11 +263,8 @@ class Job():
detail=env_dump, logfile=filename)
try:
+ # Try the task action
result = self.action(element)
- if result is not None:
- envelope = Envelope('result', result)
- self.queue.put(envelope)
-
except BstError as e:
elapsed = datetime.datetime.now() - starttime
@@ -278,6 +276,9 @@ class Job():
elapsed=elapsed, detail=e.detail,
logfile=filename, sandbox=e.sandbox)
+ # Report changes in the workspace, even if there was a handled failure
+ self.child_send_workspace(element)
+
# Report the exception to the parent (for internal testing purposes)
self.child_send_error(e)
self.child_shutdown(1)
@@ -295,14 +296,22 @@ class Job():
logfile=filename)
self.child_shutdown(1)
- elapsed = datetime.datetime.now() - starttime
- self.message(element, MessageType.SUCCESS, self.action_name, elapsed=elapsed,
- logfile=filename)
+ else:
+ # No exception occurred in the action
+ self.child_send_workspace(element)
+
+ if result is not None:
+ envelope = Envelope('result', result)
+ self.queue.put(envelope)
- # Shutdown needs to stay outside of the above context manager,
- # make sure we dont try to handle SIGTERM while the process
- # is already busy in sys.exit()
- self.child_shutdown(0)
+ elapsed = datetime.datetime.now() - starttime
+ self.message(element, MessageType.SUCCESS, self.action_name, elapsed=elapsed,
+ logfile=filename)
+
+ # Shutdown needs to stay outside of the above context manager,
+ # make sure we dont try to handle SIGTERM while the process
+ # is already busy in sys.exit()
+ self.child_shutdown(0)
def child_send_error(self, e):
domain = None
@@ -318,6 +327,12 @@ class Job():
})
self.queue.put(envelope)
+ def child_send_workspace(self, element):
+ workspace = element._get_workspace()
+ if workspace:
+ envelope = Envelope('workspace', workspace.to_dict())
+ self.queue.put(envelope)
+
def child_complete(self, pid, returncode, element):
if returncode != 0 and self.tries <= self.max_retries:
self.shutdown()
@@ -406,6 +421,8 @@ class Job():
elif envelope.message_type == 'result':
assert self.result is None
self.result = envelope.message
+ elif envelope.message_type == 'workspace':
+ self.workspace_dict = envelope.message
else:
raise Exception()
diff --git a/buildstream/_scheduler/queue.py b/buildstream/_scheduler/queue.py
index d03ce6864..e92625635 100644
--- a/buildstream/_scheduler/queue.py
+++ b/buildstream/_scheduler/queue.py
@@ -205,12 +205,30 @@ class Queue():
# first priority again next time around
self.wait_queue.extendleft(unready)
+ def update_workspaces(self, element, job):
+ # Handle any workspace modifications now
+ #
+ if job.workspace_dict:
+ project = element._get_project()
+ if project.workspaces.update_workspace(element.name, job.workspace_dict):
+ try:
+ project.workspaces.save_config()
+ except BstError as e:
+ self.message(element, MessageType.ERROR, "Error saving workspaces", detail=str(e))
+ except Exception as e: # pylint: disable=broad-except
+ self.message(element, MessageType.BUG,
+ "Unhandled exception while saving workspaces",
+ detail=traceback.format_exc())
+
def job_done(self, job, returncode, element):
# Shutdown the job
job.shutdown()
self.active_jobs.remove(job)
+ # Update workspaces in the main task before calling any queue implementation
+ self.update_workspaces(element, job)
+
# Give the result of the job to the Queue implementor,
# and determine if it should be considered as processed
# or skipped.
diff --git a/buildstream/element.py b/buildstream/element.py
index 4c7b96d51..48d75274f 100644
--- a/buildstream/element.py
+++ b/buildstream/element.py
@@ -493,13 +493,11 @@ class Element(Plugin):
files_written = {}
old_dep_keys = {}
project = self._get_project()
+ workspace = self._get_workspace()
- if self._can_build_incrementally():
- workspace = self._get_workspace()
-
- if workspace.last_successful:
- old_meta = self._get_artifact_metadata(workspace.last_successful)
- old_dep_keys = old_meta['keys']['dependencies']
+ if self._can_build_incrementally() and workspace.last_successful:
+ old_meta = self._get_artifact_metadata(workspace.last_successful)
+ old_dep_keys = old_meta['keys']['dependencies']
for dep in self.dependencies(scope):
# If we are workspaced, and we therefore perform an
@@ -507,7 +505,7 @@ class Element(Plugin):
# of any files created by our dependencies since the last
# successful build.
to_update = None
- if self._get_workspace() and old_dep_keys:
+ if workspace and old_dep_keys:
dep._assert_cached()
if dep.name in old_dep_keys:
@@ -519,9 +517,13 @@ class Element(Plugin):
# build systems anyway.
to_update, _, added = self.__artifacts.diff(dep, key_old, key_new, subdir='files')
workspace.add_running_files(dep.name, to_update + added)
- project.workspaces.save_config()
to_update.extend(workspace.running_files[dep.name])
+ # In case we are running `bst shell`, this happens in the
+ # main process and we need to update the workspace config
+ if utils._is_main_process():
+ project.workspaces.save_config()
+
result = dep.stage_artifact(sandbox,
path=path,
include=include,
@@ -879,6 +881,14 @@ class Element(Plugin):
self._update_state()
if self._get_workspace() and self._cached():
+ #
+ # Note that this block can only happen in the
+ # main process, since `self._cached()` cannot
+ # be true when assembly is completed in the task.
+ #
+ # For this reason, it is safe to update and
+ # save the workspaces configuration
+ #
project = self._get_project()
key = self._get_cache_key()
workspace = self._get_workspace()