diff options
-rw-r--r-- | src/buildstream/_scheduler/jobs/cachesizejob.py | 17 | ||||
-rw-r--r-- | src/buildstream/_scheduler/jobs/cleanupjob.py | 23 | ||||
-rw-r--r-- | src/buildstream/_scheduler/jobs/elementjob.py | 18 | ||||
-rw-r--r-- | src/buildstream/_scheduler/jobs/job.py | 199 |
4 files changed, 195 insertions, 62 deletions
diff --git a/src/buildstream/_scheduler/jobs/cachesizejob.py b/src/buildstream/_scheduler/jobs/cachesizejob.py index 81c012895..ed1cc4131 100644 --- a/src/buildstream/_scheduler/jobs/cachesizejob.py +++ b/src/buildstream/_scheduler/jobs/cachesizejob.py @@ -16,7 +16,7 @@ # Author: # Tristan Daniël Maat <tristan.maat@codethink.co.uk> # -from .job import Job, JobStatus +from .job import Job, JobStatus, ChildJob class CacheSizeJob(Job): @@ -27,12 +27,21 @@ class CacheSizeJob(Job): context = self._scheduler.context self._casquota = context.get_casquota() - def child_process(self): - return self._casquota.compute_cache_size() - def parent_complete(self, status, result): if status == JobStatus.OK: self._casquota.set_cache_size(result) if self._complete_cb: self._complete_cb(status, result) + + def create_child_job(self, *args, **kwargs): + return ChildCacheSizeJob(*args, casquota=self._scheduler.context._casquota, **kwargs) + + +class ChildCacheSizeJob(ChildJob): + def __init__(self, *args, casquota, **kwargs): + super().__init__(*args, **kwargs) + self._casquota = casquota + + def child_process(self): + return self._casquota.compute_cache_size() diff --git a/src/buildstream/_scheduler/jobs/cleanupjob.py b/src/buildstream/_scheduler/jobs/cleanupjob.py index 4764b30b3..672e784bc 100644 --- a/src/buildstream/_scheduler/jobs/cleanupjob.py +++ b/src/buildstream/_scheduler/jobs/cleanupjob.py @@ -16,7 +16,7 @@ # Author: # Tristan Daniël Maat <tristan.maat@codethink.co.uk> # -from .job import Job, JobStatus +from .job import Job, JobStatus, ChildJob class CleanupJob(Job): @@ -27,12 +27,6 @@ class CleanupJob(Job): context = self._scheduler.context self._casquota = context.get_casquota() - def child_process(self): - def progress(): - self.send_message('update-cache-size', - self._casquota.get_cache_size()) - return self._casquota.clean(progress) - def handle_message(self, message_type, message): # Update the cache size in the main process as we go, # this provides better feedback in the UI. @@ -48,3 +42,18 @@ class CleanupJob(Job): if self._complete_cb: self._complete_cb(status, result) + + def create_child_job(self, *args, **kwargs): + return ChildCleanupJob(*args, casquota=self._scheduler.context.get_casquota(), **kwargs) + + +class ChildCleanupJob(ChildJob): + def __init__(self, *args, casquota, **kwargs): + super().__init__(*args, **kwargs) + self._casquota = casquota + + def child_process(self): + def progress(): + self.send_message('update-cache-size', + self._casquota.get_cache_size()) + return self._casquota.clean(progress) diff --git a/src/buildstream/_scheduler/jobs/elementjob.py b/src/buildstream/_scheduler/jobs/elementjob.py index d6aa81567..2be0aa8f9 100644 --- a/src/buildstream/_scheduler/jobs/elementjob.py +++ b/src/buildstream/_scheduler/jobs/elementjob.py @@ -20,7 +20,7 @@ from ruamel import yaml from ..._message import MessageType -from .job import Job +from .job import Job, ChildJob # ElementJob() @@ -80,6 +80,19 @@ class ElementJob(Job): def element(self): return self._element + def parent_complete(self, status, result): + self._complete_cb(self, self._element, status, self._result) + + def create_child_job(self, *args, **kwargs): + return ChildElementJob(*args, element=self._element, action_cb=self._action_cb, **kwargs) + + +class ChildElementJob(ChildJob): + def __init__(self, *args, element, action_cb, **kwargs): + super().__init__(*args, **kwargs) + self._element = element + self._action_cb = action_cb + def child_process(self): # Print the element's environment at the beginning of any element's log file. @@ -94,9 +107,6 @@ class ElementJob(Job): # Run the action return self._action_cb(self._element) - def parent_complete(self, status, result): - self._complete_cb(self, self._element, status, self._result) - def child_process_data(self): data = {} diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py index ee01a92dc..df9d3a8bc 100644 --- a/src/buildstream/_scheduler/jobs/job.py +++ b/src/buildstream/_scheduler/jobs/job.py @@ -75,10 +75,31 @@ class Process(multiprocessing.Process): # Job() # -# The Job object represents a parallel task. When calling Job.spawn(), -# the given `Job.child_process()` will be called in parallel to the -# calling process, and `Job.parent_complete()` will be called with the -# action result in the calling process when the job completes. +# The Job object represents a task that will run in parallel to the main +# process. It has some methods that are not implemented - they are meant for +# you to implement in a subclass. +# +# It has a close relationship with the ChildJob class, and it can be considered +# a two part solution: +# +# 1. A Job instance, which will create a ChildJob instance and arrange for +# childjob.child_process() to be executed in another process. +# 2. The created ChildJob instance, which does the actual work. +# +# This split makes it clear what data is passed to the other process and what +# is executed in which process. +# +# To set up a minimal new kind of Job, e.g. YourJob: +# +# 1. Create a YourJob class, inheriting from Job. +# 2. Create a YourChildJob class, inheriting from ChildJob. +# 3. Implement YourJob.create_child_job() and YourJob.parent_complete(). +# 4. Implement YourChildJob.child_process(). +# +# A Job instance and its ChildJob share a message queue. You may send custom +# messages to the main process using YourChildJob.send_message(). Such messages +# must be processed in YourJob.handle_message(), which you will also need to +# override for this purpose. # # Args: # scheduler (Scheduler): The scheduler @@ -126,8 +147,18 @@ class Job(): self._tries += 1 self._parent_start_listening() + child_job = self.create_child_job( # pylint: disable=assignment-from-no-return + self._scheduler, + self.action_name, + self._logfile, + self._max_retries, + self._tries, + self._message_unique_id, + self._task_id, + ) + # Spawn the process - self._process = Process(target=self._child_action, args=[self._queue]) + self._process = Process(target=child_job.child_action, args=[self._queue]) # Block signals which are handled in the main process such that # the child process does not inherit the parent's state, but the main @@ -306,17 +337,6 @@ class Job(): self._scheduler.context.message( Message(unique_id, message_type, message, **kwargs)) - # send_message() - # - # To be called from inside Job.child_process() implementations - # to send messages to the main process during processing. - # - # These messages will be processed by the class's Job.handle_message() - # implementation. - # - def send_message(self, message_type, message): - self._queue.put(_Envelope(message_type, message)) - ####################################################### # Abstract Methods # ####################################################### @@ -339,8 +359,8 @@ class Job(): # parent_complete() # - # This will be executed after the job finishes, and is expected to - # pass the result to the main thread. + # This will be executed in the main process after the job finishes, and is + # expected to pass the result to the main thread. # # Args: # status (JobStatus): The job exit status @@ -350,44 +370,28 @@ class Job(): raise ImplError("Job '{kind}' does not implement parent_complete()" .format(kind=type(self).__name__)) - # child_process() + # create_child_job() # - # This will be executed after fork(), and is intended to perform - # the job's task. + # Called by a Job instance to create a child job. # - # Returns: - # (any): A (simple!) object to be returned to the main thread - # as the result. - # - def child_process(self): - raise ImplError("Job '{kind}' does not implement child_process()" - .format(kind=type(self).__name__)) - - # child_process_data() + # The child job object is an instance of a subclass of ChildJob. # - # Abstract method to retrieve additional data that should be - # returned to the parent process. Note that the job result is - # retrieved independently. + # The child job object's child_process() method will be executed in another + # process, so that work is done in parallel. See the documentation for the + # Job class for more information on this relationship. # - # Values can later be retrieved in Job.child_data. + # This method must be overridden by Job subclasses. # # Returns: - # (dict) A dict containing values to be reported to the main process + # (ChildJob): An instance of a subclass of ChildJob. # - def child_process_data(self): - return {} + def create_child_job(self, *args, **kwargs): + raise ImplError("Job '{kind}' does not implement create_child_job()" + .format(kind=type(self).__name__)) ####################################################### # Local Private Methods # ####################################################### - # - # Methods prefixed with the word 'child' take place in the child process - # - # Methods prefixed with the word 'parent' take place in the parent process - # - # Other methods can be called in both child or parent processes - # - ####################################################### # _parent_shutdown() # @@ -520,14 +524,111 @@ class Job(): self._scheduler.loop.remove_reader(self._queue._reader.fileno()) self._listening = False - # _child_action() + +# ChildJob() +# +# The ChildJob object represents the part of a parallel task that will run in a +# separate process. It has a close relationship with the parent Job that +# created it. +# +# See the documentation of the Job class for more on their relationship, and +# how to set up a (Job, ChildJob pair). +# +# The args below are passed from the parent Job to the ChildJob. +# +# Args: +# scheduler (Scheduler): The scheduler. +# action_name (str): The queue action name. +# logfile (str): A template string that points to the logfile +# that should be used - should contain {pid}. +# max_retries (int): The maximum number of retries. +# tries (int): The number of retries so far. +# message_unique_id (int): None, or the id to be supplied to the Message() constructor. +# task_id (int): None, or the plugin identifier for this job. +# +class ChildJob(): + + def __init__( + self, scheduler, action_name, logfile, max_retries, tries, message_unique_id, task_id): + + self.action_name = action_name + + self._scheduler = scheduler + self._logfile = logfile + self._max_retries = max_retries + self._tries = tries + self._message_unique_id = message_unique_id + self._task_id = task_id + + self._queue = None + + # message(): + # + # Logs a message, this will be logged in the task's logfile and + # conditionally also be sent to the frontend. + # + # Args: + # message_type (MessageType): The type of message to send + # message (str): The message + # kwargs: Remaining Message() constructor arguments, note that you can + # override 'unique_id' this way. + # + def message(self, message_type, message, **kwargs): + kwargs['scheduler'] = True + unique_id = self._message_unique_id + if "unique_id" in kwargs: + unique_id = kwargs["unique_id"] + del kwargs["unique_id"] + self._scheduler.context.message( + Message(unique_id, message_type, message, **kwargs)) + + # send_message() + # + # These messages will be processed by the Job.handle_message() + # implementation. + # + def send_message(self, message_type, message): + self._queue.put(_Envelope(message_type, message)) + + ####################################################### + # Abstract Methods # + ####################################################### + + # child_process() + # + # This will be executed after fork(), and is intended to perform + # the job's task. + # + # Returns: + # (any): A (simple!) object to be returned to the parent Job running + # in the main process. This is taken as the result of the Job. + # + def child_process(self): + raise ImplError("ChildJob '{kind}' does not implement child_process()" + .format(kind=type(self).__name__)) + + # child_process_data() + # + # Abstract method to retrieve additional data that should be + # returned to the parent process. Note that the job result is + # retrieved independently. + # + # Values can later be retrieved in Job.child_data. + # + # Returns: + # (dict) A dict containing values to be reported to the main process + # + def child_process_data(self): + return {} + + # child_action() # # Perform the action in the child process, this calls the action_cb. # # Args: # queue (multiprocessing.Queue): The message queue for IPC # - def _child_action(self, queue): + def child_action(self, queue): # This avoids some SIGTSTP signals from grandchildren # getting propagated up to the master process @@ -628,6 +729,10 @@ class Job(): # is already busy in sys.exit() self._child_shutdown(RC_OK) + ####################################################### + # Local Private Methods # + ####################################################### + # _child_send_error() # # Sends an error to the main process through the message queue |