diff options
-rw-r--r-- | src/buildstream/_scheduler/jobs/cachesizejob.py | 17 | ||||
-rw-r--r-- | src/buildstream/_scheduler/jobs/cleanupjob.py | 23 | ||||
-rw-r--r-- | src/buildstream/_scheduler/jobs/elementjob.py | 19 | ||||
-rw-r--r-- | src/buildstream/_scheduler/jobs/job.py | 158 |
4 files changed, 153 insertions, 64 deletions
diff --git a/src/buildstream/_scheduler/jobs/cachesizejob.py b/src/buildstream/_scheduler/jobs/cachesizejob.py index 5f27b7fc1..55711da01 100644 --- a/src/buildstream/_scheduler/jobs/cachesizejob.py +++ b/src/buildstream/_scheduler/jobs/cachesizejob.py @@ -16,7 +16,7 @@ # Author: # Tristan Daniël Maat <tristan.maat@codethink.co.uk> # -from .job import Job, JobStatus +from .job import Job, JobStatus, ChildJob class CacheSizeJob(Job): @@ -27,9 +27,6 @@ class CacheSizeJob(Job): context = self._scheduler.context self._casquota = context.get_casquota() - def child_process(self): - return self._casquota.compute_cache_size() - def parent_complete(self, status, result): if status == JobStatus.OK: self._casquota.set_cache_size(result) @@ -37,5 +34,17 @@ class CacheSizeJob(Job): if self._complete_cb: self._complete_cb(status, result) + def create_child_job(self): + return ChildCacheSizeJob(self._scheduler.context._casquota) + + +class ChildCacheSizeJob(ChildJob): + def __init__(self, casquota): + super().__init__() + self._casquota = casquota + + def child_process(self): + return self._casquota.compute_cache_size() + def child_process_data(self): return {} diff --git a/src/buildstream/_scheduler/jobs/cleanupjob.py b/src/buildstream/_scheduler/jobs/cleanupjob.py index 4764b30b3..325b9baa9 100644 --- a/src/buildstream/_scheduler/jobs/cleanupjob.py +++ b/src/buildstream/_scheduler/jobs/cleanupjob.py @@ -16,7 +16,7 @@ # Author: # Tristan Daniël Maat <tristan.maat@codethink.co.uk> # -from .job import Job, JobStatus +from .job import Job, JobStatus, ChildJob class CleanupJob(Job): @@ -27,12 +27,6 @@ class CleanupJob(Job): context = self._scheduler.context self._casquota = context.get_casquota() - def child_process(self): - def progress(): - self.send_message('update-cache-size', - self._casquota.get_cache_size()) - return self._casquota.clean(progress) - def handle_message(self, message_type, message): # Update the cache size in the main process as we go, # this provides better feedback in the UI. @@ -48,3 +42,18 @@ class CleanupJob(Job): if self._complete_cb: self._complete_cb(status, result) + + def create_child_job(self): + return ChildCacheSizeJob(self._scheduler.context.get_casquota()) + + +class ChildCleanupJob(ChildJob): + def __init__(self, casquota): + super().__init__() + self._casquota = casquota + + def child_process(self): + def progress(): + self.send_message('update-cache-size', + self._casquota.get_cache_size()) + return self._casquota.clean(progress) diff --git a/src/buildstream/_scheduler/jobs/elementjob.py b/src/buildstream/_scheduler/jobs/elementjob.py index d6aa81567..29ff2b127 100644 --- a/src/buildstream/_scheduler/jobs/elementjob.py +++ b/src/buildstream/_scheduler/jobs/elementjob.py @@ -20,7 +20,7 @@ from ruamel import yaml from ..._message import MessageType -from .job import Job +from .job import Job, ChildJob # ElementJob() @@ -80,6 +80,20 @@ class ElementJob(Job): def element(self): return self._element + def parent_complete(self, status, result): + self._complete_cb(self, self._element, status, self._result) + + def create_child_job(self): + return ChildElementJob(self._element, self._action_cb) + + +class ChildElementJob(ChildJob): + + def __init__(self, element, action_cb): + super().__init__() + self._element = element + self._action_cb = action_cb + def child_process(self): # Print the element's environment at the beginning of any element's log file. @@ -94,9 +108,6 @@ class ElementJob(Job): # Run the action return self._action_cb(self._element) - def parent_complete(self, status, result): - self._complete_cb(self, self._element, status, self._result) - def child_process_data(self): data = {} diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py index ee01a92dc..a899b8952 100644 --- a/src/buildstream/_scheduler/jobs/job.py +++ b/src/buildstream/_scheduler/jobs/job.py @@ -76,8 +76,8 @@ class Process(multiprocessing.Process): # Job() # # The Job object represents a parallel task. When calling Job.spawn(), -# the given `Job.child_process()` will be called in parallel to the -# calling process, and `Job.parent_complete()` will be called with the +# the given `Job.create_child_job()` will be called to run a job in parallel to +# the calling process, and `Job.parent_complete()` will be called with the # action result in the calling process when the job completes. # # Args: @@ -126,8 +126,25 @@ class Job(): self._tries += 1 self._parent_start_listening() + child_job = self.create_child_job() + + child_job.setup( + self.action_name, + self._scheduler, + self._queue, + self._max_retries, + self._tries, + self._logfile, + self._message_unique_id, + self._task_id, + ) + # Spawn the process - self._process = Process(target=self._child_action, args=[self._queue]) + # + # We can call a private method in the child job because these two + # classes go together. + # + self._process = Process(target=child_job._child_action) # Block signals which are handled in the main process such that # the child process does not inherit the parent's state, but the main @@ -306,17 +323,6 @@ class Job(): self._scheduler.context.message( Message(unique_id, message_type, message, **kwargs)) - # send_message() - # - # To be called from inside Job.child_process() implementations - # to send messages to the main process during processing. - # - # These messages will be processed by the class's Job.handle_message() - # implementation. - # - def send_message(self, message_type, message): - self._queue.put(_Envelope(message_type, message)) - ####################################################### # Abstract Methods # ####################################################### @@ -339,8 +345,8 @@ class Job(): # parent_complete() # - # This will be executed after the job finishes, and is expected to - # pass the result to the main thread. + # This will be executed in the main process after the job finishes, and is + # expected to pass the result to the main thread. # # Args: # status (JobStatus): The job exit status @@ -350,44 +356,22 @@ class Job(): raise ImplError("Job '{kind}' does not implement parent_complete()" .format(kind=type(self).__name__)) - # child_process() - # - # This will be executed after fork(), and is intended to perform - # the job's task. - # - # Returns: - # (any): A (simple!) object to be returned to the main thread - # as the result. - # - def child_process(self): - raise ImplError("Job '{kind}' does not implement child_process()" - .format(kind=type(self).__name__)) - - # child_process_data() + # create_child_job() # - # Abstract method to retrieve additional data that should be - # returned to the parent process. Note that the job result is - # retrieved independently. + # Called by Job to create an object that will be run in a child process. # - # Values can later be retrieved in Job.child_data. + # This must be overridden by Job subclasses. # # Returns: - # (dict) A dict containing values to be reported to the main process + # (ChildJob): An instance of a ChildJob subclass # - def child_process_data(self): - return {} + def create_child_job(self): + raise ImplError("Job '{kind}' does not implement create_child_job()" + .format(kind=type(self).__name__)) ####################################################### # Local Private Methods # ####################################################### - # - # Methods prefixed with the word 'child' take place in the child process - # - # Methods prefixed with the word 'parent' take place in the parent process - # - # Other methods can be called in both child or parent processes - # - ####################################################### # _parent_shutdown() # @@ -520,6 +504,85 @@ class Job(): self._scheduler.loop.remove_reader(self._queue._reader.fileno()) self._listening = False + +class ChildJob(): + + def setup( + self, action_name, scheduler, queue, max_retries, tries, logfile, message_unique_id, task_id): + + self.action_name = action_name + self._scheduler = scheduler + self._queue = queue + self._max_retries = max_retries + self._tries = tries + self._retry_flag = True + self._logfile = logfile + self._message_unique_id = message_unique_id + self._task_id = task_id + + # message(): + # + # Logs a message, this will be logged in the task's logfile and + # conditionally also be sent to the frontend. + # + # Args: + # message_type (MessageType): The type of message to send + # message (str): The message + # kwargs: Remaining Message() constructor arguments, note that you can + # override 'unique_id' this way. + # + def message(self, message_type, message, **kwargs): + kwargs['scheduler'] = True + unique_id = self._message_unique_id + if "unique_id" in kwargs: + unique_id = kwargs["unique_id"] + del kwargs["unique_id"] + self._scheduler.context.message( + Message(unique_id, message_type, message, **kwargs)) + + # send_message() + # + # These messages will be processed by the class's Job.handle_message() + # implementation. + # + def send_message(self, message_type, message): + self._queue.put(_Envelope(message_type, message)) + + ####################################################### + # Abstract Methods # + ####################################################### + + # child_process() + # + # This will be executed after fork(), and is intended to perform + # the job's task. + # + # Returns: + # (any): A (simple!) object to be returned to the main thread + # as the result. + # + def child_process(self): + raise ImplError("Job '{kind}' does not implement child_process()" + .format(kind=type(self).__name__)) + + # child_process_data() + # + # Abstract method to retrieve additional data that should be + # returned to the parent process. Note that the job result is + # retrieved independently. + # + # Values can later be retrieved in Job.child_data. + # + # Returns: + # (dict) A dict containing values to be reported to the main process + # + def child_process_data(self): + return {} + + ####################################################### + # Local Private Methods # + ####################################################### + # _child_action() # # Perform the action in the child process, this calls the action_cb. @@ -527,7 +590,7 @@ class Job(): # Args: # queue (multiprocessing.Queue): The message queue for IPC # - def _child_action(self, queue): + def _child_action(self): # This avoids some SIGTSTP signals from grandchildren # getting propagated up to the master process @@ -541,11 +604,8 @@ class Job(): signal.signal(sig, signal.SIG_DFL) signal.pthread_sigmask(signal.SIG_UNBLOCK, signal_list) - # Assign the queue we passed across the process boundaries - # # Set the global message handler in this child # process to forward messages to the parent process - self._queue = queue self._scheduler.context.set_message_handler(self._child_message_handler) starttime = datetime.datetime.now() |