summaryrefslogtreecommitdiff
path: root/buildstream/_scheduler/jobs
diff options
context:
space:
mode:
Diffstat (limited to 'buildstream/_scheduler/jobs')
-rw-r--r--buildstream/_scheduler/jobs/cachesizejob.py5
-rw-r--r--buildstream/_scheduler/jobs/cleanupjob.py20
-rw-r--r--buildstream/_scheduler/jobs/job.py69
3 files changed, 68 insertions, 26 deletions
diff --git a/buildstream/_scheduler/jobs/cachesizejob.py b/buildstream/_scheduler/jobs/cachesizejob.py
index fb56ca016..a96b92353 100644
--- a/buildstream/_scheduler/jobs/cachesizejob.py
+++ b/buildstream/_scheduler/jobs/cachesizejob.py
@@ -17,7 +17,6 @@
# Tristan Daniël Maat <tristan.maat@codethink.co.uk>
#
from .job import Job, JobStatus
-from ..._platform import Platform
class CacheSizeJob(Job):
@@ -25,8 +24,8 @@ class CacheSizeJob(Job):
super().__init__(*args, **kwargs)
self._complete_cb = complete_cb
- platform = Platform.get_platform()
- self._artifacts = platform.artifactcache
+ context = self._scheduler.context
+ self._artifacts = context.artifactcache
def child_process(self):
return self._artifacts.compute_cache_size()
diff --git a/buildstream/_scheduler/jobs/cleanupjob.py b/buildstream/_scheduler/jobs/cleanupjob.py
index 97b45901f..a1d49f339 100644
--- a/buildstream/_scheduler/jobs/cleanupjob.py
+++ b/buildstream/_scheduler/jobs/cleanupjob.py
@@ -17,7 +17,6 @@
# Tristan Daniël Maat <tristan.maat@codethink.co.uk>
#
from .job import Job, JobStatus
-from ..._platform import Platform
class CleanupJob(Job):
@@ -25,11 +24,24 @@ class CleanupJob(Job):
super().__init__(*args, **kwargs)
self._complete_cb = complete_cb
- platform = Platform.get_platform()
- self._artifacts = platform.artifactcache
+ context = self._scheduler.context
+ self._artifacts = context.artifactcache
def child_process(self):
- return self._artifacts.clean()
+ def progress():
+ self.send_message('update-cache-size',
+ self._artifacts.get_cache_size())
+ return self._artifacts.clean(progress)
+
+ def handle_message(self, message_type, message):
+
+ # Update the cache size in the main process as we go,
+ # this provides better feedback in the UI.
+ if message_type == 'update-cache-size':
+ self._artifacts.set_cache_size(message)
+ return True
+
+ return False
def parent_complete(self, status, result):
if status == JobStatus.OK:
diff --git a/buildstream/_scheduler/jobs/job.py b/buildstream/_scheduler/jobs/job.py
index 348204750..b8b4a2c76 100644
--- a/buildstream/_scheduler/jobs/job.py
+++ b/buildstream/_scheduler/jobs/job.py
@@ -58,10 +58,10 @@ class JobStatus():
# Used to distinguish between status messages and return values
-class Envelope():
+class _Envelope():
def __init__(self, message_type, message):
- self._message_type = message_type
- self._message = message
+ self.message_type = message_type
+ self.message = message
# Process class that doesn't call waitpid on its own.
@@ -263,10 +263,37 @@ class Job():
def set_task_id(self, task_id):
self._task_id = task_id
+ # send_message()
+ #
+ # To be called from inside Job.child_process() implementations
+ # to send messages to the main process during processing.
+ #
+ # These messages will be processed by the class's Job.handle_message()
+ # implementation.
+ #
+ def send_message(self, message_type, message):
+ self._queue.put(_Envelope(message_type, message))
+
#######################################################
# Abstract Methods #
#######################################################
+ # handle_message()
+ #
+ # Handle a custom message. This will be called in the main process in
+ # response to any messages sent to the main proces using the
+ # Job.send_message() API from inside a Job.child_process() implementation
+ #
+ # Args:
+ # message_type (str): A string to identify the message type
+ # message (any): A simple serializable object
+ #
+ # Returns:
+ # (bool): Should return a truthy value if message_type is handled.
+ #
+ def handle_message(self, message_type, message):
+ return False
+
# parent_complete()
#
# This will be executed after the job finishes, and is expected to
@@ -404,7 +431,7 @@ class Job():
elapsed=elapsed, detail=e.detail,
logfile=filename, sandbox=e.sandbox)
- self._queue.put(Envelope('child_data', self.child_process_data()))
+ self._queue.put(_Envelope('child_data', self.child_process_data()))
# Report the exception to the parent (for internal testing purposes)
self._child_send_error(e)
@@ -430,7 +457,7 @@ class Job():
else:
# No exception occurred in the action
- self._queue.put(Envelope('child_data', self.child_process_data()))
+ self._queue.put(_Envelope('child_data', self.child_process_data()))
self._child_send_result(result)
elapsed = datetime.datetime.now() - starttime
@@ -457,7 +484,7 @@ class Job():
domain = e.domain
reason = e.reason
- envelope = Envelope('error', {
+ envelope = _Envelope('error', {
'domain': domain,
'reason': reason
})
@@ -475,7 +502,7 @@ class Job():
#
def _child_send_result(self, result):
if result is not None:
- envelope = Envelope('result', result)
+ envelope = _Envelope('result', result)
self._queue.put(envelope)
# _child_shutdown()
@@ -512,7 +539,7 @@ class Job():
if message.message_type == MessageType.LOG:
return
- self._queue.put(Envelope('message', message))
+ self._queue.put(_Envelope('message', message))
# _parent_shutdown()
#
@@ -573,24 +600,28 @@ class Job():
if not self._listening:
return
- if envelope._message_type == 'message':
+ if envelope.message_type == 'message':
# Propagate received messages from children
# back through the context.
- self._scheduler.context.message(envelope._message)
- elif envelope._message_type == 'error':
+ self._scheduler.context.message(envelope.message)
+ elif envelope.message_type == 'error':
# For regression tests only, save the last error domain / reason
# reported from a child task in the main process, this global state
# is currently managed in _exceptions.py
- set_last_task_error(envelope._message['domain'],
- envelope._message['reason'])
- elif envelope._message_type == 'result':
+ set_last_task_error(envelope.message['domain'],
+ envelope.message['reason'])
+ elif envelope.message_type == 'result':
assert self._result is None
- self._result = envelope._message
- elif envelope._message_type == 'child_data':
+ self._result = envelope.message
+ elif envelope.message_type == 'child_data':
# If we retry a job, we assign a new value to this
- self.child_data = envelope._message
- else:
- raise Exception()
+ self.child_data = envelope.message
+
+ # Try Job subclass specific messages now
+ elif not self.handle_message(envelope.message_type,
+ envelope.message):
+ assert 0, "Unhandled message type '{}': {}" \
+ .format(envelope.message_type, envelope.message)
# _parent_process_queue()
#