summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAngelos Evripiotis <jevripiotis@bloomberg.net>2019-04-09 13:13:58 +0100
committerAngelos Evripiotis <jevripiotis@bloomberg.net>2019-05-23 15:44:53 +0100
commitbbbda5482de1ddfc05960cefa1b755ea88b12337 (patch)
treec90e59d4b82de846cbc61a199c3bde5969b87360 /src
parente5fcb69b0b4137933940bfd4d569d7e9d9d0c27e (diff)
downloadbuildstream-bbbda5482de1ddfc05960cefa1b755ea88b12337.tar.gz
spawn: split jobs into parent and child
Make it a bit clearer what happens in which process by splitting out a 'ChildJob', which encapsulates the work that will happen in the child process. This also makes it possible to control what will be transferred to the child process, which is very useful for adding support for the 'spawn' method of creating child processes as opposed to the 'fork' method.
Diffstat (limited to 'src')
-rw-r--r--src/buildstream/_scheduler/jobs/cachesizejob.py17
-rw-r--r--src/buildstream/_scheduler/jobs/cleanupjob.py23
-rw-r--r--src/buildstream/_scheduler/jobs/elementjob.py19
-rw-r--r--src/buildstream/_scheduler/jobs/job.py158
4 files changed, 153 insertions, 64 deletions
diff --git a/src/buildstream/_scheduler/jobs/cachesizejob.py b/src/buildstream/_scheduler/jobs/cachesizejob.py
index 5f27b7fc1..55711da01 100644
--- a/src/buildstream/_scheduler/jobs/cachesizejob.py
+++ b/src/buildstream/_scheduler/jobs/cachesizejob.py
@@ -16,7 +16,7 @@
# Author:
# Tristan Daniël Maat <tristan.maat@codethink.co.uk>
#
-from .job import Job, JobStatus
+from .job import Job, JobStatus, ChildJob
class CacheSizeJob(Job):
@@ -27,9 +27,6 @@ class CacheSizeJob(Job):
context = self._scheduler.context
self._casquota = context.get_casquota()
- def child_process(self):
- return self._casquota.compute_cache_size()
-
def parent_complete(self, status, result):
if status == JobStatus.OK:
self._casquota.set_cache_size(result)
@@ -37,5 +34,17 @@ class CacheSizeJob(Job):
if self._complete_cb:
self._complete_cb(status, result)
+ def create_child_job(self):
+ return ChildCacheSizeJob(self._scheduler.context._casquota)
+
+
+class ChildCacheSizeJob(ChildJob):
+ def __init__(self, casquota):
+ super().__init__()
+ self._casquota = casquota
+
+ def child_process(self):
+ return self._casquota.compute_cache_size()
+
def child_process_data(self):
return {}
diff --git a/src/buildstream/_scheduler/jobs/cleanupjob.py b/src/buildstream/_scheduler/jobs/cleanupjob.py
index 4764b30b3..325b9baa9 100644
--- a/src/buildstream/_scheduler/jobs/cleanupjob.py
+++ b/src/buildstream/_scheduler/jobs/cleanupjob.py
@@ -16,7 +16,7 @@
# Author:
# Tristan Daniël Maat <tristan.maat@codethink.co.uk>
#
-from .job import Job, JobStatus
+from .job import Job, JobStatus, ChildJob
class CleanupJob(Job):
@@ -27,12 +27,6 @@ class CleanupJob(Job):
context = self._scheduler.context
self._casquota = context.get_casquota()
- def child_process(self):
- def progress():
- self.send_message('update-cache-size',
- self._casquota.get_cache_size())
- return self._casquota.clean(progress)
-
def handle_message(self, message_type, message):
# Update the cache size in the main process as we go,
# this provides better feedback in the UI.
@@ -48,3 +42,18 @@ class CleanupJob(Job):
if self._complete_cb:
self._complete_cb(status, result)
+
+ def create_child_job(self):
+ return ChildCacheSizeJob(self._scheduler.context.get_casquota())
+
+
+class ChildCleanupJob(ChildJob):
+ def __init__(self, casquota):
+ super().__init__()
+ self._casquota = casquota
+
+ def child_process(self):
+ def progress():
+ self.send_message('update-cache-size',
+ self._casquota.get_cache_size())
+ return self._casquota.clean(progress)
diff --git a/src/buildstream/_scheduler/jobs/elementjob.py b/src/buildstream/_scheduler/jobs/elementjob.py
index d6aa81567..29ff2b127 100644
--- a/src/buildstream/_scheduler/jobs/elementjob.py
+++ b/src/buildstream/_scheduler/jobs/elementjob.py
@@ -20,7 +20,7 @@ from ruamel import yaml
from ..._message import MessageType
-from .job import Job
+from .job import Job, ChildJob
# ElementJob()
@@ -80,6 +80,20 @@ class ElementJob(Job):
def element(self):
return self._element
+ def parent_complete(self, status, result):
+ self._complete_cb(self, self._element, status, self._result)
+
+ def create_child_job(self):
+ return ChildElementJob(self._element, self._action_cb)
+
+
+class ChildElementJob(ChildJob):
+
+ def __init__(self, element, action_cb):
+ super().__init__()
+ self._element = element
+ self._action_cb = action_cb
+
def child_process(self):
# Print the element's environment at the beginning of any element's log file.
@@ -94,9 +108,6 @@ class ElementJob(Job):
# Run the action
return self._action_cb(self._element)
- def parent_complete(self, status, result):
- self._complete_cb(self, self._element, status, self._result)
-
def child_process_data(self):
data = {}
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index ee01a92dc..a899b8952 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -76,8 +76,8 @@ class Process(multiprocessing.Process):
# Job()
#
# The Job object represents a parallel task. When calling Job.spawn(),
-# the given `Job.child_process()` will be called in parallel to the
-# calling process, and `Job.parent_complete()` will be called with the
+# the given `Job.create_child_job()` will be called to run a job in parallel to
+# the calling process, and `Job.parent_complete()` will be called with the
# action result in the calling process when the job completes.
#
# Args:
@@ -126,8 +126,25 @@ class Job():
self._tries += 1
self._parent_start_listening()
+ child_job = self.create_child_job()
+
+ child_job.setup(
+ self.action_name,
+ self._scheduler,
+ self._queue,
+ self._max_retries,
+ self._tries,
+ self._logfile,
+ self._message_unique_id,
+ self._task_id,
+ )
+
# Spawn the process
- self._process = Process(target=self._child_action, args=[self._queue])
+ #
+ # We can call a private method in the child job because these two
+ # classes go together.
+ #
+ self._process = Process(target=child_job._child_action)
# Block signals which are handled in the main process such that
# the child process does not inherit the parent's state, but the main
@@ -306,17 +323,6 @@ class Job():
self._scheduler.context.message(
Message(unique_id, message_type, message, **kwargs))
- # send_message()
- #
- # To be called from inside Job.child_process() implementations
- # to send messages to the main process during processing.
- #
- # These messages will be processed by the class's Job.handle_message()
- # implementation.
- #
- def send_message(self, message_type, message):
- self._queue.put(_Envelope(message_type, message))
-
#######################################################
# Abstract Methods #
#######################################################
@@ -339,8 +345,8 @@ class Job():
# parent_complete()
#
- # This will be executed after the job finishes, and is expected to
- # pass the result to the main thread.
+ # This will be executed in the main process after the job finishes, and is
+ # expected to pass the result to the main thread.
#
# Args:
# status (JobStatus): The job exit status
@@ -350,44 +356,22 @@ class Job():
raise ImplError("Job '{kind}' does not implement parent_complete()"
.format(kind=type(self).__name__))
- # child_process()
- #
- # This will be executed after fork(), and is intended to perform
- # the job's task.
- #
- # Returns:
- # (any): A (simple!) object to be returned to the main thread
- # as the result.
- #
- def child_process(self):
- raise ImplError("Job '{kind}' does not implement child_process()"
- .format(kind=type(self).__name__))
-
- # child_process_data()
+ # create_child_job()
#
- # Abstract method to retrieve additional data that should be
- # returned to the parent process. Note that the job result is
- # retrieved independently.
+ # Called by Job to create an object that will be run in a child process.
#
- # Values can later be retrieved in Job.child_data.
+ # This must be overridden by Job subclasses.
#
# Returns:
- # (dict) A dict containing values to be reported to the main process
+ # (ChildJob): An instance of a ChildJob subclass
#
- def child_process_data(self):
- return {}
+ def create_child_job(self):
+ raise ImplError("Job '{kind}' does not implement create_child_job()"
+ .format(kind=type(self).__name__))
#######################################################
# Local Private Methods #
#######################################################
- #
- # Methods prefixed with the word 'child' take place in the child process
- #
- # Methods prefixed with the word 'parent' take place in the parent process
- #
- # Other methods can be called in both child or parent processes
- #
- #######################################################
# _parent_shutdown()
#
@@ -520,6 +504,85 @@ class Job():
self._scheduler.loop.remove_reader(self._queue._reader.fileno())
self._listening = False
+
+class ChildJob():
+
+ def setup(
+ self, action_name, scheduler, queue, max_retries, tries, logfile, message_unique_id, task_id):
+
+ self.action_name = action_name
+ self._scheduler = scheduler
+ self._queue = queue
+ self._max_retries = max_retries
+ self._tries = tries
+ self._retry_flag = True
+ self._logfile = logfile
+ self._message_unique_id = message_unique_id
+ self._task_id = task_id
+
+ # message():
+ #
+ # Logs a message, this will be logged in the task's logfile and
+ # conditionally also be sent to the frontend.
+ #
+ # Args:
+ # message_type (MessageType): The type of message to send
+ # message (str): The message
+ # kwargs: Remaining Message() constructor arguments, note that you can
+ # override 'unique_id' this way.
+ #
+ def message(self, message_type, message, **kwargs):
+ kwargs['scheduler'] = True
+ unique_id = self._message_unique_id
+ if "unique_id" in kwargs:
+ unique_id = kwargs["unique_id"]
+ del kwargs["unique_id"]
+ self._scheduler.context.message(
+ Message(unique_id, message_type, message, **kwargs))
+
+ # send_message()
+ #
+ # These messages will be processed by the class's Job.handle_message()
+ # implementation.
+ #
+ def send_message(self, message_type, message):
+ self._queue.put(_Envelope(message_type, message))
+
+ #######################################################
+ # Abstract Methods #
+ #######################################################
+
+ # child_process()
+ #
+ # This will be executed after fork(), and is intended to perform
+ # the job's task.
+ #
+ # Returns:
+ # (any): A (simple!) object to be returned to the main thread
+ # as the result.
+ #
+ def child_process(self):
+ raise ImplError("Job '{kind}' does not implement child_process()"
+ .format(kind=type(self).__name__))
+
+ # child_process_data()
+ #
+ # Abstract method to retrieve additional data that should be
+ # returned to the parent process. Note that the job result is
+ # retrieved independently.
+ #
+ # Values can later be retrieved in Job.child_data.
+ #
+ # Returns:
+ # (dict) A dict containing values to be reported to the main process
+ #
+ def child_process_data(self):
+ return {}
+
+ #######################################################
+ # Local Private Methods #
+ #######################################################
+
# _child_action()
#
# Perform the action in the child process, this calls the action_cb.
@@ -527,7 +590,7 @@ class Job():
# Args:
# queue (multiprocessing.Queue): The message queue for IPC
#
- def _child_action(self, queue):
+ def _child_action(self):
# This avoids some SIGTSTP signals from grandchildren
# getting propagated up to the master process
@@ -541,11 +604,8 @@ class Job():
signal.signal(sig, signal.SIG_DFL)
signal.pthread_sigmask(signal.SIG_UNBLOCK, signal_list)
- # Assign the queue we passed across the process boundaries
- #
# Set the global message handler in this child
# process to forward messages to the parent process
- self._queue = queue
self._scheduler.context.set_message_handler(self._child_message_handler)
starttime = datetime.datetime.now()