summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbst-marge-bot <marge-bot@buildstream.build>2019-06-05 14:40:18 +0000
committerbst-marge-bot <marge-bot@buildstream.build>2019-06-05 14:40:18 +0000
commitf0288920fe4da689e8cd49ff48645f280c7fe08e (patch)
tree2bbcea96cdf0a29ae00d506ac2ea7536784562bb
parent1233899f7b151447f3a820d1c41e162beddf8236 (diff)
parentdda569baf84db84631f860a550968a58cfea9e52 (diff)
downloadbuildstream-f0288920fe4da689e8cd49ff48645f280c7fe08e.tar.gz
Merge branch 'aevri/split_jobs_parent_child' into 'master'
Split ChildJob out from Job class See merge request BuildStream/buildstream!1334
-rw-r--r--src/buildstream/_scheduler/jobs/cachesizejob.py18
-rw-r--r--src/buildstream/_scheduler/jobs/cleanupjob.py23
-rw-r--r--src/buildstream/_scheduler/jobs/elementjob.py18
-rw-r--r--src/buildstream/_scheduler/jobs/job.py232
4 files changed, 218 insertions, 73 deletions
diff --git a/src/buildstream/_scheduler/jobs/cachesizejob.py b/src/buildstream/_scheduler/jobs/cachesizejob.py
index 5f27b7fc1..ed1cc4131 100644
--- a/src/buildstream/_scheduler/jobs/cachesizejob.py
+++ b/src/buildstream/_scheduler/jobs/cachesizejob.py
@@ -16,7 +16,7 @@
# Author:
# Tristan Daniël Maat <tristan.maat@codethink.co.uk>
#
-from .job import Job, JobStatus
+from .job import Job, JobStatus, ChildJob
class CacheSizeJob(Job):
@@ -27,9 +27,6 @@ class CacheSizeJob(Job):
context = self._scheduler.context
self._casquota = context.get_casquota()
- def child_process(self):
- return self._casquota.compute_cache_size()
-
def parent_complete(self, status, result):
if status == JobStatus.OK:
self._casquota.set_cache_size(result)
@@ -37,5 +34,14 @@ class CacheSizeJob(Job):
if self._complete_cb:
self._complete_cb(status, result)
- def child_process_data(self):
- return {}
+ def create_child_job(self, *args, **kwargs):
+ return ChildCacheSizeJob(*args, casquota=self._scheduler.context._casquota, **kwargs)
+
+
+class ChildCacheSizeJob(ChildJob):
+ def __init__(self, *args, casquota, **kwargs):
+ super().__init__(*args, **kwargs)
+ self._casquota = casquota
+
+ def child_process(self):
+ return self._casquota.compute_cache_size()
diff --git a/src/buildstream/_scheduler/jobs/cleanupjob.py b/src/buildstream/_scheduler/jobs/cleanupjob.py
index 4764b30b3..672e784bc 100644
--- a/src/buildstream/_scheduler/jobs/cleanupjob.py
+++ b/src/buildstream/_scheduler/jobs/cleanupjob.py
@@ -16,7 +16,7 @@
# Author:
# Tristan Daniël Maat <tristan.maat@codethink.co.uk>
#
-from .job import Job, JobStatus
+from .job import Job, JobStatus, ChildJob
class CleanupJob(Job):
@@ -27,12 +27,6 @@ class CleanupJob(Job):
context = self._scheduler.context
self._casquota = context.get_casquota()
- def child_process(self):
- def progress():
- self.send_message('update-cache-size',
- self._casquota.get_cache_size())
- return self._casquota.clean(progress)
-
def handle_message(self, message_type, message):
# Update the cache size in the main process as we go,
# this provides better feedback in the UI.
@@ -48,3 +42,18 @@ class CleanupJob(Job):
if self._complete_cb:
self._complete_cb(status, result)
+
+ def create_child_job(self, *args, **kwargs):
+ return ChildCleanupJob(*args, casquota=self._scheduler.context.get_casquota(), **kwargs)
+
+
+class ChildCleanupJob(ChildJob):
+ def __init__(self, *args, casquota, **kwargs):
+ super().__init__(*args, **kwargs)
+ self._casquota = casquota
+
+ def child_process(self):
+ def progress():
+ self.send_message('update-cache-size',
+ self._casquota.get_cache_size())
+ return self._casquota.clean(progress)
diff --git a/src/buildstream/_scheduler/jobs/elementjob.py b/src/buildstream/_scheduler/jobs/elementjob.py
index d6aa81567..2be0aa8f9 100644
--- a/src/buildstream/_scheduler/jobs/elementjob.py
+++ b/src/buildstream/_scheduler/jobs/elementjob.py
@@ -20,7 +20,7 @@ from ruamel import yaml
from ..._message import MessageType
-from .job import Job
+from .job import Job, ChildJob
# ElementJob()
@@ -80,6 +80,19 @@ class ElementJob(Job):
def element(self):
return self._element
+ def parent_complete(self, status, result):
+ self._complete_cb(self, self._element, status, self._result)
+
+ def create_child_job(self, *args, **kwargs):
+ return ChildElementJob(*args, element=self._element, action_cb=self._action_cb, **kwargs)
+
+
+class ChildElementJob(ChildJob):
+ def __init__(self, *args, element, action_cb, **kwargs):
+ super().__init__(*args, **kwargs)
+ self._element = element
+ self._action_cb = action_cb
+
def child_process(self):
# Print the element's environment at the beginning of any element's log file.
@@ -94,9 +107,6 @@ class ElementJob(Job):
# Run the action
return self._action_cb(self._element)
- def parent_complete(self, status, result):
- self._complete_cb(self, self._element, status, self._result)
-
def child_process_data(self):
data = {}
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index 203564af7..88156f3bf 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -75,10 +75,31 @@ class Process(multiprocessing.Process):
# Job()
#
-# The Job object represents a parallel task, when calling Job.spawn(),
-# the given `Job.child_process()` will be called in parallel to the
-# calling process, and `Job.parent_complete()` will be called with the
-# action result in the calling process when the job completes.
+# The Job object represents a task that will run in parallel to the main
+# process. It has some methods that are not implemented - they are meant for
+# you to implement in a subclass.
+#
+# It has a close relationship with the ChildJob class, and it can be considered
+# a two part solution:
+#
+# 1. A Job instance, which will create a ChildJob instance and arrange for
+# childjob.child_process() to be executed in another process.
+# 2. The created ChildJob instance, which does the actual work.
+#
+# This split makes it clear what data is passed to the other process and what
+# is executed in which process.
+#
+# To set up a minimal new kind of Job, e.g. YourJob:
+#
+# 1. Create a YourJob class, inheriting from Job.
+# 2. Create a YourChildJob class, inheriting from ChildJob.
+# 3. Implement YourJob.create_child_job() and YourJob.parent_complete().
+# 4. Implement YourChildJob.child_process().
+#
+# A Job instance and its ChildJob share a message queue. You may send custom
+# messages to the main process using YourChildJob.send_message(). Such messages
+# must be processed in YourJob.handle_message(), which you will also need to
+# override for this purpose.
#
# Args:
# scheduler (Scheduler): The scheduler
@@ -126,8 +147,18 @@ class Job():
self._tries += 1
self._parent_start_listening()
+ child_job = self.create_child_job( # pylint: disable=assignment-from-no-return
+ self._scheduler,
+ self.action_name,
+ self._logfile,
+ self._max_retries,
+ self._tries,
+ self._message_unique_id,
+ self._task_id,
+ )
+
# Spawn the process
- self._process = Process(target=self._child_action, args=[self._queue])
+ self._process = Process(target=child_job.child_action, args=[self._queue])
# Block signals which are handled in the main process such that
# the child process does not inherit the parent's state, but the main
@@ -306,17 +337,6 @@ class Job():
self._scheduler.context.message(
Message(unique_id, message_type, message, **kwargs))
- # send_message()
- #
- # To be called from inside Job.child_process() implementations
- # to send messages to the main process during processing.
- #
- # These messages will be processed by the class's Job.handle_message()
- # implementation.
- #
- def send_message(self, message_type, message):
- self._queue.put(_Envelope(message_type, message))
-
#######################################################
# Abstract Methods #
#######################################################
@@ -329,7 +349,8 @@ class Job():
#
# Args:
# message_type (str): A string to identify the message type
- # message (any): A simple serializable object
+ # message (any): A simple object (must be pickle-able, i.e. strings,
+ # lists, dicts, numbers, but not Element instances).
#
# Returns:
# (bool): Should return a truthy value if message_type is handled.
@@ -339,8 +360,8 @@ class Job():
# parent_complete()
#
- # This will be executed after the job finishes, and is expected to
- # pass the result to the main thread.
+ # This will be executed in the main process after the job finishes, and is
+ # expected to pass the result to the main thread.
#
# Args:
# status (JobStatus): The job exit status
@@ -350,44 +371,28 @@ class Job():
raise ImplError("Job '{kind}' does not implement parent_complete()"
.format(kind=type(self).__name__))
- # child_process()
- #
- # This will be executed after fork(), and is intended to perform
- # the job's task.
+ # create_child_job()
#
- # Returns:
- # (any): A (simple!) object to be returned to the main thread
- # as the result.
+ # Called by a Job instance to create a child job.
#
- def child_process(self):
- raise ImplError("Job '{kind}' does not implement child_process()"
- .format(kind=type(self).__name__))
-
- # child_process_data()
+ # The child job object is an instance of a subclass of ChildJob.
#
- # Abstract method to retrieve additional data that should be
- # returned to the parent process. Note that the job result is
- # retrieved independently.
+ # The child job object's child_process() method will be executed in another
+ # process, so that work is done in parallel. See the documentation for the
+ # Job class for more information on this relationship.
#
- # Values can later be retrieved in Job.child_data.
+ # This method must be overridden by Job subclasses.
#
# Returns:
- # (dict) A dict containing values to be reported to the main process
+ # (ChildJob): An instance of a subclass of ChildJob.
#
- def child_process_data(self):
- return {}
+ def create_child_job(self, *args, **kwargs):
+ raise ImplError("Job '{kind}' does not implement create_child_job()"
+ .format(kind=type(self).__name__))
#######################################################
# Local Private Methods #
#######################################################
- #
- # Methods prefixed with the word 'child' take place in the child process
- #
- # Methods prefixed with the word 'parent' take place in the parent process
- #
- # Other methods can be called in both child or parent processes
- #
- #######################################################
# _parent_shutdown()
#
@@ -520,14 +525,125 @@ class Job():
self._scheduler.loop.remove_reader(self._queue._reader.fileno())
self._listening = False
- # _child_action()
+
+# ChildJob()
+#
+# The ChildJob object represents the part of a parallel task that will run in a
+# separate process. It has a close relationship with the parent Job that
+# created it.
+#
+# See the documentation of the Job class for more on their relationship, and
+# how to set up a (Job, ChildJob pair).
+#
+# The args below are passed from the parent Job to the ChildJob.
+#
+# Args:
+# scheduler (Scheduler): The scheduler.
+# action_name (str): The queue action name.
+# logfile (str): A template string that points to the logfile
+# that should be used - should contain {pid}.
+# max_retries (int): The maximum number of retries.
+# tries (int): The number of retries so far.
+# message_unique_id (int): None, or the id to be supplied to the Message() constructor.
+# task_id (int): None, or the plugin identifier for this job.
+#
+class ChildJob():
+
+ def __init__(
+ self, scheduler, action_name, logfile, max_retries, tries, message_unique_id, task_id):
+
+ self.action_name = action_name
+
+ self._scheduler = scheduler
+ self._logfile = logfile
+ self._max_retries = max_retries
+ self._tries = tries
+ self._message_unique_id = message_unique_id
+ self._task_id = task_id
+
+ self._queue = None
+
+ # message():
+ #
+ # Logs a message, this will be logged in the task's logfile and
+ # conditionally also be sent to the frontend.
+ #
+ # Args:
+ # message_type (MessageType): The type of message to send
+ # message (str): The message
+ # kwargs: Remaining Message() constructor arguments, note that you can
+ # override 'unique_id' this way.
+ #
+ def message(self, message_type, message, **kwargs):
+ kwargs['scheduler'] = True
+ unique_id = self._message_unique_id
+ if "unique_id" in kwargs:
+ unique_id = kwargs["unique_id"]
+ del kwargs["unique_id"]
+ self._scheduler.context.message(
+ Message(unique_id, message_type, message, **kwargs))
+
+ # send_message()
+ #
+ # Send data in a message to the parent Job, running in the main process.
+ #
+ # This allows for custom inter-process communication between subclasses of
+ # Job and ChildJob.
+ #
+ # These messages will be processed by the Job.handle_message()
+ # implementation, which may be overridden to support one or more custom
+ # 'message_type's.
+ #
+ # Args:
+ # message_type (str): The type of message to send.
+ # message_data (any): A simple object (must be pickle-able, i.e.
+ # strings, lists, dicts, numbers, but not Element
+ # instances). This is sent to the parent Job.
+ #
+ def send_message(self, message_type, message_data):
+ self._queue.put(_Envelope(message_type, message_data))
+
+ #######################################################
+ # Abstract Methods #
+ #######################################################
+
+ # child_process()
+ #
+ # This will be executed after fork(), and is intended to perform
+ # the job's task.
+ #
+ # Returns:
+ # (any): A simple object (must be pickle-able, i.e. strings, lists,
+ # dicts, numbers, but not Element instances). It is returned to
+ # the parent Job running in the main process. This is taken as
+ # the result of the Job.
+ #
+ def child_process(self):
+ raise ImplError("ChildJob '{kind}' does not implement child_process()"
+ .format(kind=type(self).__name__))
+
+ # child_process_data()
+ #
+ # Abstract method to retrieve additional data that should be
+ # returned to the parent process. Note that the job result is
+ # retrieved independently.
+ #
+ # Values can later be retrieved in Job.child_data.
+ #
+ # Returns:
+ # (dict) A dict containing values to be reported to the main process
+ #
+ def child_process_data(self):
+ return {}
+
+ # child_action()
#
# Perform the action in the child process, this calls the action_cb.
#
# Args:
# queue (multiprocessing.Queue): The message queue for IPC
#
- def _child_action(self, queue):
+ def child_action(self, queue):
# This avoids some SIGTSTP signals from grandchildren
# getting propagated up to the master process
@@ -590,7 +706,7 @@ class Job():
elapsed=elapsed, detail=e.detail,
logfile=filename, sandbox=e.sandbox)
- self._queue.put(_Envelope('child_data', self.child_process_data()))
+ self.send_message('child_data', self.child_process_data())
# Report the exception to the parent (for internal testing purposes)
self._child_send_error(e)
@@ -616,7 +732,7 @@ class Job():
else:
# No exception occurred in the action
- self._queue.put(_Envelope('child_data', self.child_process_data()))
+ self.send_message('child_data', self.child_process_data())
self._child_send_result(result)
elapsed = datetime.datetime.now() - starttime
@@ -628,6 +744,10 @@ class Job():
# is already busy in sys.exit()
self._child_shutdown(RC_OK)
+ #######################################################
+ # Local Private Methods #
+ #######################################################
+
# _child_send_error()
#
# Sends an error to the main process through the message queue
@@ -643,26 +763,26 @@ class Job():
domain = e.domain
reason = e.reason
- envelope = _Envelope('error', {
+ self.send_message('error', {
'domain': domain,
'reason': reason
})
- self._queue.put(envelope)
# _child_send_result()
#
# Sends the serialized result to the main process through the message queue
#
# Args:
- # result (object): A simple serializable object, or None
+ # result (any): None, or a simple object (must be pickle-able, i.e.
+ # strings, lists, dicts, numbers, but not Element
+ # instances).
#
# Note: If None is passed here, nothing needs to be sent, the
# result member in the parent process will simply remain None.
#
def _child_send_result(self, result):
if result is not None:
- envelope = _Envelope('result', result)
- self._queue.put(envelope)
+ self.send_message('result', result)
# _child_shutdown()
#
@@ -698,4 +818,4 @@ class Job():
if message.message_type == MessageType.LOG:
return
- self._queue.put(_Envelope('message', message))
+ self.send_message('message', message)