summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAngelos Evripiotis <jevripiotis@bloomberg.net>2019-04-09 13:13:58 +0100
committerbst-marge-bot <marge-bot@buildstream.build>2019-06-05 14:13:10 +0000
commitdad39f94ad147101efd166ddd6eadf6cadd8e806 (patch)
tree52c146eb3af9e34869505f5b7a5ac319ceca6fa8
parent64417815b89ebb625598aa647b2a014eaa5aa790 (diff)
downloadbuildstream-dad39f94ad147101efd166ddd6eadf6cadd8e806.tar.gz
_scheduler/jobs: split jobs into parent and child
Make it clearer what happens in which process by splitting out a 'ChildJob', which encapsulates the work that happens in the child process. This also makes it possible to control what is transferred to the child process. This is very useful for adding support for the 'spawn' method of creating child processes as opposed to the 'fork' method.
-rw-r--r--src/buildstream/_scheduler/jobs/cachesizejob.py17
-rw-r--r--src/buildstream/_scheduler/jobs/cleanupjob.py23
-rw-r--r--src/buildstream/_scheduler/jobs/elementjob.py18
-rw-r--r--src/buildstream/_scheduler/jobs/job.py199
4 files changed, 195 insertions, 62 deletions
diff --git a/src/buildstream/_scheduler/jobs/cachesizejob.py b/src/buildstream/_scheduler/jobs/cachesizejob.py
index 81c012895..ed1cc4131 100644
--- a/src/buildstream/_scheduler/jobs/cachesizejob.py
+++ b/src/buildstream/_scheduler/jobs/cachesizejob.py
@@ -16,7 +16,7 @@
# Author:
# Tristan Daniël Maat <tristan.maat@codethink.co.uk>
#
-from .job import Job, JobStatus
+from .job import Job, JobStatus, ChildJob
class CacheSizeJob(Job):
@@ -27,12 +27,21 @@ class CacheSizeJob(Job):
context = self._scheduler.context
self._casquota = context.get_casquota()
- def child_process(self):
- return self._casquota.compute_cache_size()
-
def parent_complete(self, status, result):
if status == JobStatus.OK:
self._casquota.set_cache_size(result)
if self._complete_cb:
self._complete_cb(status, result)
+
+ def create_child_job(self, *args, **kwargs):
+ return ChildCacheSizeJob(*args, casquota=self._scheduler.context._casquota, **kwargs)
+
+
+class ChildCacheSizeJob(ChildJob):
+ def __init__(self, *args, casquota, **kwargs):
+ super().__init__(*args, **kwargs)
+ self._casquota = casquota
+
+ def child_process(self):
+ return self._casquota.compute_cache_size()
diff --git a/src/buildstream/_scheduler/jobs/cleanupjob.py b/src/buildstream/_scheduler/jobs/cleanupjob.py
index 4764b30b3..672e784bc 100644
--- a/src/buildstream/_scheduler/jobs/cleanupjob.py
+++ b/src/buildstream/_scheduler/jobs/cleanupjob.py
@@ -16,7 +16,7 @@
# Author:
# Tristan Daniël Maat <tristan.maat@codethink.co.uk>
#
-from .job import Job, JobStatus
+from .job import Job, JobStatus, ChildJob
class CleanupJob(Job):
@@ -27,12 +27,6 @@ class CleanupJob(Job):
context = self._scheduler.context
self._casquota = context.get_casquota()
- def child_process(self):
- def progress():
- self.send_message('update-cache-size',
- self._casquota.get_cache_size())
- return self._casquota.clean(progress)
-
def handle_message(self, message_type, message):
# Update the cache size in the main process as we go,
# this provides better feedback in the UI.
@@ -48,3 +42,18 @@ class CleanupJob(Job):
if self._complete_cb:
self._complete_cb(status, result)
+
+ def create_child_job(self, *args, **kwargs):
+ return ChildCleanupJob(*args, casquota=self._scheduler.context.get_casquota(), **kwargs)
+
+
+class ChildCleanupJob(ChildJob):
+ def __init__(self, *args, casquota, **kwargs):
+ super().__init__(*args, **kwargs)
+ self._casquota = casquota
+
+ def child_process(self):
+ def progress():
+ self.send_message('update-cache-size',
+ self._casquota.get_cache_size())
+ return self._casquota.clean(progress)
diff --git a/src/buildstream/_scheduler/jobs/elementjob.py b/src/buildstream/_scheduler/jobs/elementjob.py
index d6aa81567..2be0aa8f9 100644
--- a/src/buildstream/_scheduler/jobs/elementjob.py
+++ b/src/buildstream/_scheduler/jobs/elementjob.py
@@ -20,7 +20,7 @@ from ruamel import yaml
from ..._message import MessageType
-from .job import Job
+from .job import Job, ChildJob
# ElementJob()
@@ -80,6 +80,19 @@ class ElementJob(Job):
def element(self):
return self._element
+ def parent_complete(self, status, result):
+ self._complete_cb(self, self._element, status, self._result)
+
+ def create_child_job(self, *args, **kwargs):
+ return ChildElementJob(*args, element=self._element, action_cb=self._action_cb, **kwargs)
+
+
+class ChildElementJob(ChildJob):
+ def __init__(self, *args, element, action_cb, **kwargs):
+ super().__init__(*args, **kwargs)
+ self._element = element
+ self._action_cb = action_cb
+
def child_process(self):
# Print the element's environment at the beginning of any element's log file.
@@ -94,9 +107,6 @@ class ElementJob(Job):
# Run the action
return self._action_cb(self._element)
- def parent_complete(self, status, result):
- self._complete_cb(self, self._element, status, self._result)
-
def child_process_data(self):
data = {}
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index ee01a92dc..df9d3a8bc 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -75,10 +75,31 @@ class Process(multiprocessing.Process):
# Job()
#
-# The Job object represents a parallel task. When calling Job.spawn(),
-# the given `Job.child_process()` will be called in parallel to the
-# calling process, and `Job.parent_complete()` will be called with the
-# action result in the calling process when the job completes.
+# The Job object represents a task that will run in parallel to the main
+# process. It has some methods that are not implemented - they are meant for
+# you to implement in a subclass.
+#
+# It has a close relationship with the ChildJob class, and it can be considered
+# a two part solution:
+#
+# 1. A Job instance, which will create a ChildJob instance and arrange for
+# childjob.child_process() to be executed in another process.
+# 2. The created ChildJob instance, which does the actual work.
+#
+# This split makes it clear what data is passed to the other process and what
+# is executed in which process.
+#
+# To set up a minimal new kind of Job, e.g. YourJob:
+#
+# 1. Create a YourJob class, inheriting from Job.
+# 2. Create a YourChildJob class, inheriting from ChildJob.
+# 3. Implement YourJob.create_child_job() and YourJob.parent_complete().
+# 4. Implement YourChildJob.child_process().
+#
+# A Job instance and its ChildJob share a message queue. You may send custom
+# messages to the main process using YourChildJob.send_message(). Such messages
+# must be processed in YourJob.handle_message(), which you will also need to
+# override for this purpose.
#
# Args:
# scheduler (Scheduler): The scheduler
@@ -126,8 +147,18 @@ class Job():
self._tries += 1
self._parent_start_listening()
+ child_job = self.create_child_job( # pylint: disable=assignment-from-no-return
+ self._scheduler,
+ self.action_name,
+ self._logfile,
+ self._max_retries,
+ self._tries,
+ self._message_unique_id,
+ self._task_id,
+ )
+
# Spawn the process
- self._process = Process(target=self._child_action, args=[self._queue])
+ self._process = Process(target=child_job.child_action, args=[self._queue])
# Block signals which are handled in the main process such that
# the child process does not inherit the parent's state, but the main
@@ -306,17 +337,6 @@ class Job():
self._scheduler.context.message(
Message(unique_id, message_type, message, **kwargs))
- # send_message()
- #
- # To be called from inside Job.child_process() implementations
- # to send messages to the main process during processing.
- #
- # These messages will be processed by the class's Job.handle_message()
- # implementation.
- #
- def send_message(self, message_type, message):
- self._queue.put(_Envelope(message_type, message))
-
#######################################################
# Abstract Methods #
#######################################################
@@ -339,8 +359,8 @@ class Job():
# parent_complete()
#
- # This will be executed after the job finishes, and is expected to
- # pass the result to the main thread.
+ # This will be executed in the main process after the job finishes, and is
+ # expected to pass the result to the main thread.
#
# Args:
# status (JobStatus): The job exit status
@@ -350,44 +370,28 @@ class Job():
raise ImplError("Job '{kind}' does not implement parent_complete()"
.format(kind=type(self).__name__))
- # child_process()
+ # create_child_job()
#
- # This will be executed after fork(), and is intended to perform
- # the job's task.
+ # Called by a Job instance to create a child job.
#
- # Returns:
- # (any): A (simple!) object to be returned to the main thread
- # as the result.
- #
- def child_process(self):
- raise ImplError("Job '{kind}' does not implement child_process()"
- .format(kind=type(self).__name__))
-
- # child_process_data()
+ # The child job object is an instance of a subclass of ChildJob.
#
- # Abstract method to retrieve additional data that should be
- # returned to the parent process. Note that the job result is
- # retrieved independently.
+ # The child job object's child_process() method will be executed in another
+ # process, so that work is done in parallel. See the documentation for the
+ # Job class for more information on this relationship.
#
- # Values can later be retrieved in Job.child_data.
+ # This method must be overridden by Job subclasses.
#
# Returns:
- # (dict) A dict containing values to be reported to the main process
+ # (ChildJob): An instance of a subclass of ChildJob.
#
- def child_process_data(self):
- return {}
+ def create_child_job(self, *args, **kwargs):
+ raise ImplError("Job '{kind}' does not implement create_child_job()"
+ .format(kind=type(self).__name__))
#######################################################
# Local Private Methods #
#######################################################
- #
- # Methods prefixed with the word 'child' take place in the child process
- #
- # Methods prefixed with the word 'parent' take place in the parent process
- #
- # Other methods can be called in both child or parent processes
- #
- #######################################################
# _parent_shutdown()
#
@@ -520,14 +524,111 @@ class Job():
self._scheduler.loop.remove_reader(self._queue._reader.fileno())
self._listening = False
- # _child_action()
+
+# ChildJob()
+#
+# The ChildJob object represents the part of a parallel task that will run in a
+# separate process. It has a close relationship with the parent Job that
+# created it.
+#
+# See the documentation of the Job class for more on their relationship, and
+# how to set up a (Job, ChildJob pair).
+#
+# The args below are passed from the parent Job to the ChildJob.
+#
+# Args:
+# scheduler (Scheduler): The scheduler.
+# action_name (str): The queue action name.
+# logfile (str): A template string that points to the logfile
+# that should be used - should contain {pid}.
+# max_retries (int): The maximum number of retries.
+# tries (int): The number of retries so far.
+# message_unique_id (int): None, or the id to be supplied to the Message() constructor.
+# task_id (int): None, or the plugin identifier for this job.
+#
+class ChildJob():
+
+ def __init__(
+ self, scheduler, action_name, logfile, max_retries, tries, message_unique_id, task_id):
+
+ self.action_name = action_name
+
+ self._scheduler = scheduler
+ self._logfile = logfile
+ self._max_retries = max_retries
+ self._tries = tries
+ self._message_unique_id = message_unique_id
+ self._task_id = task_id
+
+ self._queue = None
+
+ # message():
+ #
+ # Logs a message, this will be logged in the task's logfile and
+ # conditionally also be sent to the frontend.
+ #
+ # Args:
+ # message_type (MessageType): The type of message to send
+ # message (str): The message
+ # kwargs: Remaining Message() constructor arguments, note that you can
+ # override 'unique_id' this way.
+ #
+ def message(self, message_type, message, **kwargs):
+ kwargs['scheduler'] = True
+ unique_id = self._message_unique_id
+ if "unique_id" in kwargs:
+ unique_id = kwargs["unique_id"]
+ del kwargs["unique_id"]
+ self._scheduler.context.message(
+ Message(unique_id, message_type, message, **kwargs))
+
+ # send_message()
+ #
+ # These messages will be processed by the Job.handle_message()
+ # implementation.
+ #
+ def send_message(self, message_type, message):
+ self._queue.put(_Envelope(message_type, message))
+
+ #######################################################
+ # Abstract Methods #
+ #######################################################
+
+ # child_process()
+ #
+ # This will be executed after fork(), and is intended to perform
+ # the job's task.
+ #
+ # Returns:
+ # (any): A (simple!) object to be returned to the parent Job running
+ # in the main process. This is taken as the result of the Job.
+ #
+ def child_process(self):
+ raise ImplError("ChildJob '{kind}' does not implement child_process()"
+ .format(kind=type(self).__name__))
+
+ # child_process_data()
+ #
+ # Abstract method to retrieve additional data that should be
+ # returned to the parent process. Note that the job result is
+ # retrieved independently.
+ #
+ # Values can later be retrieved in Job.child_data.
+ #
+ # Returns:
+ # (dict) A dict containing values to be reported to the main process
+ #
+ def child_process_data(self):
+ return {}
+
+ # child_action()
#
# Perform the action in the child process, this calls the action_cb.
#
# Args:
# queue (multiprocessing.Queue): The message queue for IPC
#
- def _child_action(self, queue):
+ def child_action(self, queue):
# This avoids some SIGTSTP signals from grandchildren
# getting propagated up to the master process
@@ -628,6 +729,10 @@ class Job():
# is already busy in sys.exit()
self._child_shutdown(RC_OK)
+ #######################################################
+ # Local Private Methods #
+ #######################################################
+
# _child_send_error()
#
# Sends an error to the main process through the message queue