summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/buildstream/_scheduler/jobs/cachesizejob.py17
-rw-r--r--src/buildstream/_scheduler/jobs/cleanupjob.py23
-rw-r--r--src/buildstream/_scheduler/jobs/elementjob.py19
-rw-r--r--src/buildstream/_scheduler/jobs/job.py158
4 files changed, 153 insertions, 64 deletions
diff --git a/src/buildstream/_scheduler/jobs/cachesizejob.py b/src/buildstream/_scheduler/jobs/cachesizejob.py
index 5f27b7fc1..55711da01 100644
--- a/src/buildstream/_scheduler/jobs/cachesizejob.py
+++ b/src/buildstream/_scheduler/jobs/cachesizejob.py
@@ -16,7 +16,7 @@
# Author:
# Tristan Daniël Maat <tristan.maat@codethink.co.uk>
#
-from .job import Job, JobStatus
+from .job import Job, JobStatus, ChildJob
class CacheSizeJob(Job):
@@ -27,9 +27,6 @@ class CacheSizeJob(Job):
context = self._scheduler.context
self._casquota = context.get_casquota()
- def child_process(self):
- return self._casquota.compute_cache_size()
-
def parent_complete(self, status, result):
if status == JobStatus.OK:
self._casquota.set_cache_size(result)
@@ -37,5 +34,17 @@ class CacheSizeJob(Job):
if self._complete_cb:
self._complete_cb(status, result)
+ def create_child_job(self):
+ return ChildCacheSizeJob(self._scheduler.context._casquota)
+
+
+class ChildCacheSizeJob(ChildJob):
+ def __init__(self, casquota):
+ super().__init__()
+ self._casquota = casquota
+
+ def child_process(self):
+ return self._casquota.compute_cache_size()
+
def child_process_data(self):
return {}
diff --git a/src/buildstream/_scheduler/jobs/cleanupjob.py b/src/buildstream/_scheduler/jobs/cleanupjob.py
index 4764b30b3..325b9baa9 100644
--- a/src/buildstream/_scheduler/jobs/cleanupjob.py
+++ b/src/buildstream/_scheduler/jobs/cleanupjob.py
@@ -16,7 +16,7 @@
# Author:
# Tristan Daniël Maat <tristan.maat@codethink.co.uk>
#
-from .job import Job, JobStatus
+from .job import Job, JobStatus, ChildJob
class CleanupJob(Job):
@@ -27,12 +27,6 @@ class CleanupJob(Job):
context = self._scheduler.context
self._casquota = context.get_casquota()
- def child_process(self):
- def progress():
- self.send_message('update-cache-size',
- self._casquota.get_cache_size())
- return self._casquota.clean(progress)
-
def handle_message(self, message_type, message):
# Update the cache size in the main process as we go,
# this provides better feedback in the UI.
@@ -48,3 +42,18 @@ class CleanupJob(Job):
if self._complete_cb:
self._complete_cb(status, result)
+
+ def create_child_job(self):
+ return ChildCacheSizeJob(self._scheduler.context.get_casquota())
+
+
+class ChildCleanupJob(ChildJob):
+ def __init__(self, casquota):
+ super().__init__()
+ self._casquota = casquota
+
+ def child_process(self):
+ def progress():
+ self.send_message('update-cache-size',
+ self._casquota.get_cache_size())
+ return self._casquota.clean(progress)
diff --git a/src/buildstream/_scheduler/jobs/elementjob.py b/src/buildstream/_scheduler/jobs/elementjob.py
index d6aa81567..29ff2b127 100644
--- a/src/buildstream/_scheduler/jobs/elementjob.py
+++ b/src/buildstream/_scheduler/jobs/elementjob.py
@@ -20,7 +20,7 @@ from ruamel import yaml
from ..._message import MessageType
-from .job import Job
+from .job import Job, ChildJob
# ElementJob()
@@ -80,6 +80,20 @@ class ElementJob(Job):
def element(self):
return self._element
+ def parent_complete(self, status, result):
+ self._complete_cb(self, self._element, status, self._result)
+
+ def create_child_job(self):
+ return ChildElementJob(self._element, self._action_cb)
+
+
+class ChildElementJob(ChildJob):
+
+ def __init__(self, element, action_cb):
+ super().__init__()
+ self._element = element
+ self._action_cb = action_cb
+
def child_process(self):
# Print the element's environment at the beginning of any element's log file.
@@ -94,9 +108,6 @@ class ElementJob(Job):
# Run the action
return self._action_cb(self._element)
- def parent_complete(self, status, result):
- self._complete_cb(self, self._element, status, self._result)
-
def child_process_data(self):
data = {}
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index ee01a92dc..a899b8952 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -76,8 +76,8 @@ class Process(multiprocessing.Process):
# Job()
#
# The Job object represents a parallel task. When calling Job.spawn(),
-# the given `Job.child_process()` will be called in parallel to the
-# calling process, and `Job.parent_complete()` will be called with the
+# the given `Job.create_child_job()` will be called to run a job in parallel to
+# the calling process, and `Job.parent_complete()` will be called with the
# action result in the calling process when the job completes.
#
# Args:
@@ -126,8 +126,25 @@ class Job():
self._tries += 1
self._parent_start_listening()
+ child_job = self.create_child_job()
+
+ child_job.setup(
+ self.action_name,
+ self._scheduler,
+ self._queue,
+ self._max_retries,
+ self._tries,
+ self._logfile,
+ self._message_unique_id,
+ self._task_id,
+ )
+
# Spawn the process
- self._process = Process(target=self._child_action, args=[self._queue])
+ #
+ # We can call a private method in the child job because these two
+ # classes go together.
+ #
+ self._process = Process(target=child_job._child_action)
# Block signals which are handled in the main process such that
# the child process does not inherit the parent's state, but the main
@@ -306,17 +323,6 @@ class Job():
self._scheduler.context.message(
Message(unique_id, message_type, message, **kwargs))
- # send_message()
- #
- # To be called from inside Job.child_process() implementations
- # to send messages to the main process during processing.
- #
- # These messages will be processed by the class's Job.handle_message()
- # implementation.
- #
- def send_message(self, message_type, message):
- self._queue.put(_Envelope(message_type, message))
-
#######################################################
# Abstract Methods #
#######################################################
@@ -339,8 +345,8 @@ class Job():
# parent_complete()
#
- # This will be executed after the job finishes, and is expected to
- # pass the result to the main thread.
+ # This will be executed in the main process after the job finishes, and is
+ # expected to pass the result to the main thread.
#
# Args:
# status (JobStatus): The job exit status
@@ -350,44 +356,22 @@ class Job():
raise ImplError("Job '{kind}' does not implement parent_complete()"
.format(kind=type(self).__name__))
- # child_process()
- #
- # This will be executed after fork(), and is intended to perform
- # the job's task.
- #
- # Returns:
- # (any): A (simple!) object to be returned to the main thread
- # as the result.
- #
- def child_process(self):
- raise ImplError("Job '{kind}' does not implement child_process()"
- .format(kind=type(self).__name__))
-
- # child_process_data()
+ # create_child_job()
#
- # Abstract method to retrieve additional data that should be
- # returned to the parent process. Note that the job result is
- # retrieved independently.
+ # Called by Job to create an object that will be run in a child process.
#
- # Values can later be retrieved in Job.child_data.
+ # This must be overridden by Job subclasses.
#
# Returns:
- # (dict) A dict containing values to be reported to the main process
+ # (ChildJob): An instance of a ChildJob subclass
#
- def child_process_data(self):
- return {}
+ def create_child_job(self):
+ raise ImplError("Job '{kind}' does not implement create_child_job()"
+ .format(kind=type(self).__name__))
#######################################################
# Local Private Methods #
#######################################################
- #
- # Methods prefixed with the word 'child' take place in the child process
- #
- # Methods prefixed with the word 'parent' take place in the parent process
- #
- # Other methods can be called in both child or parent processes
- #
- #######################################################
# _parent_shutdown()
#
@@ -520,6 +504,85 @@ class Job():
self._scheduler.loop.remove_reader(self._queue._reader.fileno())
self._listening = False
+
+class ChildJob():
+
+ def setup(
+ self, action_name, scheduler, queue, max_retries, tries, logfile, message_unique_id, task_id):
+
+ self.action_name = action_name
+ self._scheduler = scheduler
+ self._queue = queue
+ self._max_retries = max_retries
+ self._tries = tries
+ self._retry_flag = True
+ self._logfile = logfile
+ self._message_unique_id = message_unique_id
+ self._task_id = task_id
+
+ # message():
+ #
+ # Logs a message, this will be logged in the task's logfile and
+ # conditionally also be sent to the frontend.
+ #
+ # Args:
+ # message_type (MessageType): The type of message to send
+ # message (str): The message
+ # kwargs: Remaining Message() constructor arguments, note that you can
+ # override 'unique_id' this way.
+ #
+ def message(self, message_type, message, **kwargs):
+ kwargs['scheduler'] = True
+ unique_id = self._message_unique_id
+ if "unique_id" in kwargs:
+ unique_id = kwargs["unique_id"]
+ del kwargs["unique_id"]
+ self._scheduler.context.message(
+ Message(unique_id, message_type, message, **kwargs))
+
+ # send_message()
+ #
+ # These messages will be processed by the class's Job.handle_message()
+ # implementation.
+ #
+ def send_message(self, message_type, message):
+ self._queue.put(_Envelope(message_type, message))
+
+ #######################################################
+ # Abstract Methods #
+ #######################################################
+
+ # child_process()
+ #
+ # This will be executed after fork(), and is intended to perform
+ # the job's task.
+ #
+ # Returns:
+ # (any): A (simple!) object to be returned to the main thread
+ # as the result.
+ #
+ def child_process(self):
+ raise ImplError("Job '{kind}' does not implement child_process()"
+ .format(kind=type(self).__name__))
+
+ # child_process_data()
+ #
+ # Abstract method to retrieve additional data that should be
+ # returned to the parent process. Note that the job result is
+ # retrieved independently.
+ #
+ # Values can later be retrieved in Job.child_data.
+ #
+ # Returns:
+ # (dict) A dict containing values to be reported to the main process
+ #
+ def child_process_data(self):
+ return {}
+
+ #######################################################
+ # Local Private Methods #
+ #######################################################
+
# _child_action()
#
# Perform the action in the child process, this calls the action_cb.
@@ -527,7 +590,7 @@ class Job():
# Args:
# queue (multiprocessing.Queue): The message queue for IPC
#
- def _child_action(self, queue):
+ def _child_action(self):
# This avoids some SIGTSTP signals from grandchildren
# getting propagated up to the master process
@@ -541,11 +604,8 @@ class Job():
signal.signal(sig, signal.SIG_DFL)
signal.pthread_sigmask(signal.SIG_UNBLOCK, signal_list)
- # Assign the queue we passed across the process boundaries
- #
# Set the global message handler in this child
# process to forward messages to the parent process
- self._queue = queue
self._scheduler.context.set_message_handler(self._child_message_handler)
starttime = datetime.datetime.now()