summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/buildstream/_scheduler/jobs/cachesizejob.py17
-rw-r--r--src/buildstream/_scheduler/jobs/cleanupjob.py23
-rw-r--r--src/buildstream/_scheduler/jobs/elementjob.py18
-rw-r--r--src/buildstream/_scheduler/jobs/job.py199
4 files changed, 195 insertions, 62 deletions
diff --git a/src/buildstream/_scheduler/jobs/cachesizejob.py b/src/buildstream/_scheduler/jobs/cachesizejob.py
index 81c012895..ed1cc4131 100644
--- a/src/buildstream/_scheduler/jobs/cachesizejob.py
+++ b/src/buildstream/_scheduler/jobs/cachesizejob.py
@@ -16,7 +16,7 @@
# Author:
# Tristan Daniël Maat <tristan.maat@codethink.co.uk>
#
-from .job import Job, JobStatus
+from .job import Job, JobStatus, ChildJob
class CacheSizeJob(Job):
@@ -27,12 +27,21 @@ class CacheSizeJob(Job):
context = self._scheduler.context
self._casquota = context.get_casquota()
- def child_process(self):
- return self._casquota.compute_cache_size()
-
def parent_complete(self, status, result):
if status == JobStatus.OK:
self._casquota.set_cache_size(result)
if self._complete_cb:
self._complete_cb(status, result)
+
+ def create_child_job(self, *args, **kwargs):
+ return ChildCacheSizeJob(*args, casquota=self._scheduler.context._casquota, **kwargs)
+
+
+class ChildCacheSizeJob(ChildJob):
+ def __init__(self, *args, casquota, **kwargs):
+ super().__init__(*args, **kwargs)
+ self._casquota = casquota
+
+ def child_process(self):
+ return self._casquota.compute_cache_size()
diff --git a/src/buildstream/_scheduler/jobs/cleanupjob.py b/src/buildstream/_scheduler/jobs/cleanupjob.py
index 4764b30b3..672e784bc 100644
--- a/src/buildstream/_scheduler/jobs/cleanupjob.py
+++ b/src/buildstream/_scheduler/jobs/cleanupjob.py
@@ -16,7 +16,7 @@
# Author:
# Tristan Daniël Maat <tristan.maat@codethink.co.uk>
#
-from .job import Job, JobStatus
+from .job import Job, JobStatus, ChildJob
class CleanupJob(Job):
@@ -27,12 +27,6 @@ class CleanupJob(Job):
context = self._scheduler.context
self._casquota = context.get_casquota()
- def child_process(self):
- def progress():
- self.send_message('update-cache-size',
- self._casquota.get_cache_size())
- return self._casquota.clean(progress)
-
def handle_message(self, message_type, message):
# Update the cache size in the main process as we go,
# this provides better feedback in the UI.
@@ -48,3 +42,18 @@ class CleanupJob(Job):
if self._complete_cb:
self._complete_cb(status, result)
+
+ def create_child_job(self, *args, **kwargs):
+ return ChildCleanupJob(*args, casquota=self._scheduler.context.get_casquota(), **kwargs)
+
+
+class ChildCleanupJob(ChildJob):
+ def __init__(self, *args, casquota, **kwargs):
+ super().__init__(*args, **kwargs)
+ self._casquota = casquota
+
+ def child_process(self):
+ def progress():
+ self.send_message('update-cache-size',
+ self._casquota.get_cache_size())
+ return self._casquota.clean(progress)
diff --git a/src/buildstream/_scheduler/jobs/elementjob.py b/src/buildstream/_scheduler/jobs/elementjob.py
index d6aa81567..2be0aa8f9 100644
--- a/src/buildstream/_scheduler/jobs/elementjob.py
+++ b/src/buildstream/_scheduler/jobs/elementjob.py
@@ -20,7 +20,7 @@ from ruamel import yaml
from ..._message import MessageType
-from .job import Job
+from .job import Job, ChildJob
# ElementJob()
@@ -80,6 +80,19 @@ class ElementJob(Job):
def element(self):
return self._element
+ def parent_complete(self, status, result):
+ self._complete_cb(self, self._element, status, self._result)
+
+ def create_child_job(self, *args, **kwargs):
+ return ChildElementJob(*args, element=self._element, action_cb=self._action_cb, **kwargs)
+
+
+class ChildElementJob(ChildJob):
+ def __init__(self, *args, element, action_cb, **kwargs):
+ super().__init__(*args, **kwargs)
+ self._element = element
+ self._action_cb = action_cb
+
def child_process(self):
# Print the element's environment at the beginning of any element's log file.
@@ -94,9 +107,6 @@ class ElementJob(Job):
# Run the action
return self._action_cb(self._element)
- def parent_complete(self, status, result):
- self._complete_cb(self, self._element, status, self._result)
-
def child_process_data(self):
data = {}
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index ee01a92dc..df9d3a8bc 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -75,10 +75,31 @@ class Process(multiprocessing.Process):
# Job()
#
-# The Job object represents a parallel task. When calling Job.spawn(),
-# the given `Job.child_process()` will be called in parallel to the
-# calling process, and `Job.parent_complete()` will be called with the
-# action result in the calling process when the job completes.
+# The Job object represents a task that will run in parallel to the main
+# process. It has some methods that are not implemented - they are meant for
+# you to implement in a subclass.
+#
+# It has a close relationship with the ChildJob class, and it can be considered
+# a two part solution:
+#
+# 1. A Job instance, which will create a ChildJob instance and arrange for
+# childjob.child_process() to be executed in another process.
+# 2. The created ChildJob instance, which does the actual work.
+#
+# This split makes it clear what data is passed to the other process and what
+# is executed in which process.
+#
+# To set up a minimal new kind of Job, e.g. YourJob:
+#
+# 1. Create a YourJob class, inheriting from Job.
+# 2. Create a YourChildJob class, inheriting from ChildJob.
+# 3. Implement YourJob.create_child_job() and YourJob.parent_complete().
+# 4. Implement YourChildJob.child_process().
+#
+# A Job instance and its ChildJob share a message queue. You may send custom
+# messages to the main process using YourChildJob.send_message(). Such messages
+# must be processed in YourJob.handle_message(), which you will also need to
+# override for this purpose.
#
# Args:
# scheduler (Scheduler): The scheduler
@@ -126,8 +147,18 @@ class Job():
self._tries += 1
self._parent_start_listening()
+ child_job = self.create_child_job( # pylint: disable=assignment-from-no-return
+ self._scheduler,
+ self.action_name,
+ self._logfile,
+ self._max_retries,
+ self._tries,
+ self._message_unique_id,
+ self._task_id,
+ )
+
# Spawn the process
- self._process = Process(target=self._child_action, args=[self._queue])
+ self._process = Process(target=child_job.child_action, args=[self._queue])
# Block signals which are handled in the main process such that
# the child process does not inherit the parent's state, but the main
@@ -306,17 +337,6 @@ class Job():
self._scheduler.context.message(
Message(unique_id, message_type, message, **kwargs))
- # send_message()
- #
- # To be called from inside Job.child_process() implementations
- # to send messages to the main process during processing.
- #
- # These messages will be processed by the class's Job.handle_message()
- # implementation.
- #
- def send_message(self, message_type, message):
- self._queue.put(_Envelope(message_type, message))
-
#######################################################
# Abstract Methods #
#######################################################
@@ -339,8 +359,8 @@ class Job():
# parent_complete()
#
- # This will be executed after the job finishes, and is expected to
- # pass the result to the main thread.
+ # This will be executed in the main process after the job finishes, and is
+ # expected to pass the result to the main thread.
#
# Args:
# status (JobStatus): The job exit status
@@ -350,44 +370,28 @@ class Job():
raise ImplError("Job '{kind}' does not implement parent_complete()"
.format(kind=type(self).__name__))
- # child_process()
+ # create_child_job()
#
- # This will be executed after fork(), and is intended to perform
- # the job's task.
+ # Called by a Job instance to create a child job.
#
- # Returns:
- # (any): A (simple!) object to be returned to the main thread
- # as the result.
- #
- def child_process(self):
- raise ImplError("Job '{kind}' does not implement child_process()"
- .format(kind=type(self).__name__))
-
- # child_process_data()
+ # The child job object is an instance of a subclass of ChildJob.
#
- # Abstract method to retrieve additional data that should be
- # returned to the parent process. Note that the job result is
- # retrieved independently.
+ # The child job object's child_process() method will be executed in another
+ # process, so that work is done in parallel. See the documentation for the
+ # Job class for more information on this relationship.
#
- # Values can later be retrieved in Job.child_data.
+ # This method must be overridden by Job subclasses.
#
# Returns:
- # (dict) A dict containing values to be reported to the main process
+ # (ChildJob): An instance of a subclass of ChildJob.
#
- def child_process_data(self):
- return {}
+ def create_child_job(self, *args, **kwargs):
+ raise ImplError("Job '{kind}' does not implement create_child_job()"
+ .format(kind=type(self).__name__))
#######################################################
# Local Private Methods #
#######################################################
- #
- # Methods prefixed with the word 'child' take place in the child process
- #
- # Methods prefixed with the word 'parent' take place in the parent process
- #
- # Other methods can be called in both child or parent processes
- #
- #######################################################
# _parent_shutdown()
#
@@ -520,14 +524,111 @@ class Job():
self._scheduler.loop.remove_reader(self._queue._reader.fileno())
self._listening = False
- # _child_action()
+
+# ChildJob()
+#
+# The ChildJob object represents the part of a parallel task that will run in a
+# separate process. It has a close relationship with the parent Job that
+# created it.
+#
+# See the documentation of the Job class for more on their relationship, and
+# how to set up a (Job, ChildJob pair).
+#
+# The args below are passed from the parent Job to the ChildJob.
+#
+# Args:
+# scheduler (Scheduler): The scheduler.
+# action_name (str): The queue action name.
+# logfile (str): A template string that points to the logfile
+# that should be used - should contain {pid}.
+# max_retries (int): The maximum number of retries.
+# tries (int): The number of retries so far.
+# message_unique_id (int): None, or the id to be supplied to the Message() constructor.
+# task_id (int): None, or the plugin identifier for this job.
+#
+class ChildJob():
+
+ def __init__(
+ self, scheduler, action_name, logfile, max_retries, tries, message_unique_id, task_id):
+
+ self.action_name = action_name
+
+ self._scheduler = scheduler
+ self._logfile = logfile
+ self._max_retries = max_retries
+ self._tries = tries
+ self._message_unique_id = message_unique_id
+ self._task_id = task_id
+
+ self._queue = None
+
+ # message():
+ #
+ # Logs a message, this will be logged in the task's logfile and
+ # conditionally also be sent to the frontend.
+ #
+ # Args:
+ # message_type (MessageType): The type of message to send
+ # message (str): The message
+ # kwargs: Remaining Message() constructor arguments, note that you can
+ # override 'unique_id' this way.
+ #
+ def message(self, message_type, message, **kwargs):
+ kwargs['scheduler'] = True
+ unique_id = self._message_unique_id
+ if "unique_id" in kwargs:
+ unique_id = kwargs["unique_id"]
+ del kwargs["unique_id"]
+ self._scheduler.context.message(
+ Message(unique_id, message_type, message, **kwargs))
+
+ # send_message()
+ #
+ # These messages will be processed by the Job.handle_message()
+ # implementation.
+ #
+ def send_message(self, message_type, message):
+ self._queue.put(_Envelope(message_type, message))
+
+ #######################################################
+ # Abstract Methods #
+ #######################################################
+
+ # child_process()
+ #
+ # This will be executed after fork(), and is intended to perform
+ # the job's task.
+ #
+ # Returns:
+ # (any): A (simple!) object to be returned to the parent Job running
+ # in the main process. This is taken as the result of the Job.
+ #
+ def child_process(self):
+ raise ImplError("ChildJob '{kind}' does not implement child_process()"
+ .format(kind=type(self).__name__))
+
+ # child_process_data()
+ #
+ # Abstract method to retrieve additional data that should be
+ # returned to the parent process. Note that the job result is
+ # retrieved independently.
+ #
+ # Values can later be retrieved in Job.child_data.
+ #
+ # Returns:
+ # (dict) A dict containing values to be reported to the main process
+ #
+ def child_process_data(self):
+ return {}
+
+ # child_action()
#
# Perform the action in the child process, this calls the action_cb.
#
# Args:
# queue (multiprocessing.Queue): The message queue for IPC
#
- def _child_action(self, queue):
+ def child_action(self, queue):
# This avoids some SIGTSTP signals from grandchildren
# getting propagated up to the master process
@@ -628,6 +729,10 @@ class Job():
# is already busy in sys.exit()
self._child_shutdown(RC_OK)
+ #######################################################
+ # Local Private Methods #
+ #######################################################
+
# _child_send_error()
#
# Sends an error to the main process through the message queue