diff options
Diffstat (limited to 'buildstream/_scheduler/jobs')
-rw-r--r-- | buildstream/_scheduler/jobs/cachesizejob.py | 5 | ||||
-rw-r--r-- | buildstream/_scheduler/jobs/cleanupjob.py | 20 | ||||
-rw-r--r-- | buildstream/_scheduler/jobs/job.py | 69 |
3 files changed, 68 insertions, 26 deletions
diff --git a/buildstream/_scheduler/jobs/cachesizejob.py b/buildstream/_scheduler/jobs/cachesizejob.py index fb56ca016..a96b92353 100644 --- a/buildstream/_scheduler/jobs/cachesizejob.py +++ b/buildstream/_scheduler/jobs/cachesizejob.py @@ -17,7 +17,6 @@ # Tristan Daniël Maat <tristan.maat@codethink.co.uk> # from .job import Job, JobStatus -from ..._platform import Platform class CacheSizeJob(Job): @@ -25,8 +24,8 @@ class CacheSizeJob(Job): super().__init__(*args, **kwargs) self._complete_cb = complete_cb - platform = Platform.get_platform() - self._artifacts = platform.artifactcache + context = self._scheduler.context + self._artifacts = context.artifactcache def child_process(self): return self._artifacts.compute_cache_size() diff --git a/buildstream/_scheduler/jobs/cleanupjob.py b/buildstream/_scheduler/jobs/cleanupjob.py index 97b45901f..a1d49f339 100644 --- a/buildstream/_scheduler/jobs/cleanupjob.py +++ b/buildstream/_scheduler/jobs/cleanupjob.py @@ -17,7 +17,6 @@ # Tristan Daniël Maat <tristan.maat@codethink.co.uk> # from .job import Job, JobStatus -from ..._platform import Platform class CleanupJob(Job): @@ -25,11 +24,24 @@ class CleanupJob(Job): super().__init__(*args, **kwargs) self._complete_cb = complete_cb - platform = Platform.get_platform() - self._artifacts = platform.artifactcache + context = self._scheduler.context + self._artifacts = context.artifactcache def child_process(self): - return self._artifacts.clean() + def progress(): + self.send_message('update-cache-size', + self._artifacts.get_cache_size()) + return self._artifacts.clean(progress) + + def handle_message(self, message_type, message): + + # Update the cache size in the main process as we go, + # this provides better feedback in the UI. + if message_type == 'update-cache-size': + self._artifacts.set_cache_size(message) + return True + + return False def parent_complete(self, status, result): if status == JobStatus.OK: diff --git a/buildstream/_scheduler/jobs/job.py b/buildstream/_scheduler/jobs/job.py index 348204750..b8b4a2c76 100644 --- a/buildstream/_scheduler/jobs/job.py +++ b/buildstream/_scheduler/jobs/job.py @@ -58,10 +58,10 @@ class JobStatus(): # Used to distinguish between status messages and return values -class Envelope(): +class _Envelope(): def __init__(self, message_type, message): - self._message_type = message_type - self._message = message + self.message_type = message_type + self.message = message # Process class that doesn't call waitpid on its own. @@ -263,10 +263,37 @@ class Job(): def set_task_id(self, task_id): self._task_id = task_id + # send_message() + # + # To be called from inside Job.child_process() implementations + # to send messages to the main process during processing. + # + # These messages will be processed by the class's Job.handle_message() + # implementation. + # + def send_message(self, message_type, message): + self._queue.put(_Envelope(message_type, message)) + ####################################################### # Abstract Methods # ####################################################### + # handle_message() + # + # Handle a custom message. This will be called in the main process in + # response to any messages sent to the main proces using the + # Job.send_message() API from inside a Job.child_process() implementation + # + # Args: + # message_type (str): A string to identify the message type + # message (any): A simple serializable object + # + # Returns: + # (bool): Should return a truthy value if message_type is handled. + # + def handle_message(self, message_type, message): + return False + # parent_complete() # # This will be executed after the job finishes, and is expected to @@ -404,7 +431,7 @@ class Job(): elapsed=elapsed, detail=e.detail, logfile=filename, sandbox=e.sandbox) - self._queue.put(Envelope('child_data', self.child_process_data())) + self._queue.put(_Envelope('child_data', self.child_process_data())) # Report the exception to the parent (for internal testing purposes) self._child_send_error(e) @@ -430,7 +457,7 @@ class Job(): else: # No exception occurred in the action - self._queue.put(Envelope('child_data', self.child_process_data())) + self._queue.put(_Envelope('child_data', self.child_process_data())) self._child_send_result(result) elapsed = datetime.datetime.now() - starttime @@ -457,7 +484,7 @@ class Job(): domain = e.domain reason = e.reason - envelope = Envelope('error', { + envelope = _Envelope('error', { 'domain': domain, 'reason': reason }) @@ -475,7 +502,7 @@ class Job(): # def _child_send_result(self, result): if result is not None: - envelope = Envelope('result', result) + envelope = _Envelope('result', result) self._queue.put(envelope) # _child_shutdown() @@ -512,7 +539,7 @@ class Job(): if message.message_type == MessageType.LOG: return - self._queue.put(Envelope('message', message)) + self._queue.put(_Envelope('message', message)) # _parent_shutdown() # @@ -573,24 +600,28 @@ class Job(): if not self._listening: return - if envelope._message_type == 'message': + if envelope.message_type == 'message': # Propagate received messages from children # back through the context. - self._scheduler.context.message(envelope._message) - elif envelope._message_type == 'error': + self._scheduler.context.message(envelope.message) + elif envelope.message_type == 'error': # For regression tests only, save the last error domain / reason # reported from a child task in the main process, this global state # is currently managed in _exceptions.py - set_last_task_error(envelope._message['domain'], - envelope._message['reason']) - elif envelope._message_type == 'result': + set_last_task_error(envelope.message['domain'], + envelope.message['reason']) + elif envelope.message_type == 'result': assert self._result is None - self._result = envelope._message - elif envelope._message_type == 'child_data': + self._result = envelope.message + elif envelope.message_type == 'child_data': # If we retry a job, we assign a new value to this - self.child_data = envelope._message - else: - raise Exception() + self.child_data = envelope.message + + # Try Job subclass specific messages now + elif not self.handle_message(envelope.message_type, + envelope.message): + assert 0, "Unhandled message type '{}': {}" \ + .format(envelope.message_type, envelope.message) # _parent_process_queue() # |