summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTristan Maat <tristan.maat@codethink.co.uk>2018-07-16 15:31:55 +0100
committerTristan Maat <tristan.maat@codethink.co.uk>2018-07-16 17:37:06 +0100
commit7094526b28db94dd8a9e190610b73a228effb7b6 (patch)
treec640e93caa3e7a258562d0cb531053f7b7fa08f5
parent70cd94afa9a05ef0005cf85aa0db1d795b64bbd3 (diff)
downloadbuildstream-7094526b28db94dd8a9e190610b73a228effb7b6.tar.gz
Make Jobs abstract and element-independent
-rw-r--r--buildstream/_frontend/app.py35
-rw-r--r--buildstream/_frontend/status.py27
-rw-r--r--buildstream/_scheduler/__init__.py2
-rw-r--r--buildstream/_scheduler/jobs/__init__.py1
-rw-r--r--buildstream/_scheduler/jobs/elementjob.py217
-rw-r--r--buildstream/_scheduler/jobs/job.py (renamed from buildstream/_scheduler/job.py)351
-rw-r--r--buildstream/_scheduler/queues/__init__.py2
-rw-r--r--buildstream/_scheduler/queues/buildqueue.py7
-rw-r--r--buildstream/_scheduler/queues/fetchqueue.py7
-rw-r--r--buildstream/_scheduler/queues/pullqueue.py7
-rw-r--r--buildstream/_scheduler/queues/pushqueue.py7
-rw-r--r--buildstream/_scheduler/queues/queue.py136
-rw-r--r--buildstream/_scheduler/queues/trackqueue.py7
-rw-r--r--buildstream/_scheduler/resources.py105
-rw-r--r--buildstream/_scheduler/scheduler.py213
15 files changed, 758 insertions, 366 deletions
diff --git a/buildstream/_frontend/app.py b/buildstream/_frontend/app.py
index 4675b0eb0..de910afe7 100644
--- a/buildstream/_frontend/app.py
+++ b/buildstream/_frontend/app.py
@@ -492,30 +492,37 @@ class App():
def _tick(self, elapsed):
self._maybe_render_status()
- def _job_started(self, element, action_name):
- self._status.add_job(element, action_name)
+ def _job_started(self, job):
+ self._status.add_job(job)
self._maybe_render_status()
- def _job_completed(self, element, queue, action_name, success):
- self._status.remove_job(element, action_name)
+ def _job_completed(self, job, success):
+ self._status.remove_job(job)
self._maybe_render_status()
# Dont attempt to handle a failure if the user has already opted to
# terminate
if not success and not self.stream.terminated:
- # Get the last failure message for additional context
- failure = self._fail_messages.get(element._get_unique_id())
+ if hasattr(job, 'element'):
+ element = job.element
+ queue = job.queue
- # XXX This is dangerous, sometimes we get the job completed *before*
- # the failure message reaches us ??
- if not failure:
- self._status.clear()
- click.echo("\n\n\nBUG: Message handling out of sync, " +
- "unable to retrieve failure message for element {}\n\n\n\n\n"
- .format(element), err=True)
+ # Get the last failure message for additional context
+ failure = self._fail_messages.get(element._get_unique_id())
+
+ # XXX This is dangerous, sometimes we get the job completed *before*
+ # the failure message reaches us ??
+ if not failure:
+ self._status.clear()
+ click.echo("\n\n\nBUG: Message handling out of sync, " +
+ "unable to retrieve failure message for element {}\n\n\n\n\n"
+ .format(element), err=True)
+ else:
+ self._handle_failure(element, queue, failure)
else:
- self._handle_failure(element, queue, failure)
+ click.echo("\nTerminating all jobs\n", err=True)
+ self.stream.terminate()
def _handle_failure(self, element, queue, failure):
diff --git a/buildstream/_frontend/status.py b/buildstream/_frontend/status.py
index 3f66e009a..7a2e71969 100644
--- a/buildstream/_frontend/status.py
+++ b/buildstream/_frontend/status.py
@@ -77,9 +77,9 @@ class Status():
# element (Element): The element of the job to track
# action_name (str): The action name for this job
#
- def add_job(self, element, action_name):
+ def add_job(self, job):
elapsed = self._stream.elapsed_time
- job = _StatusJob(self._context, element, action_name, self._content_profile, self._format_profile, elapsed)
+ job = _StatusJob(self._context, job, self._content_profile, self._format_profile, elapsed)
self._jobs.append(job)
self._need_alloc = True
@@ -91,7 +91,13 @@ class Status():
# element (Element): The element of the job to track
# action_name (str): The action name for this job
#
- def remove_job(self, element, action_name):
+ def remove_job(self, job):
+ action_name = job.action_name
+ if not hasattr(job, 'element'):
+ element = None
+ else:
+ element = job.element
+
self._jobs = [
job for job in self._jobs
if not (job.element is element and
@@ -358,15 +364,19 @@ class _StatusHeader():
#
# Args:
# context (Context): The Context
-# element (Element): The element being processed
-# action_name (str): The name of the action
+# job (Job): The job being processed
# content_profile (Profile): Formatting profile for content text
# format_profile (Profile): Formatting profile for formatting text
# elapsed (datetime): The offset into the session when this job is created
#
class _StatusJob():
- def __init__(self, context, element, action_name, content_profile, format_profile, elapsed):
+ def __init__(self, context, job, content_profile, format_profile, elapsed):
+ action_name = job.action_name
+ if not hasattr(job, 'element'):
+ element = None
+ else:
+ element = job.element
#
# Public members
@@ -374,6 +384,7 @@ class _StatusJob():
self.element = element # The Element
self.action_name = action_name # The action name
self.size = None # The number of characters required to render
+ self.full_name = element._get_full_name() if element else action_name
#
# Private members
@@ -386,7 +397,7 @@ class _StatusJob():
# Calculate the size needed to display
self.size = 10 # Size of time code with brackets
self.size += len(action_name)
- self.size += len(element._get_full_name())
+ self.size += len(self.full_name)
self.size += 3 # '[' + ':' + ']'
# render()
@@ -403,7 +414,7 @@ class _StatusJob():
self._format_profile.fmt(']')
# Add padding after the display name, before terminating ']'
- name = self.element._get_full_name() + (' ' * padding)
+ name = self.full_name + (' ' * padding)
text += self._format_profile.fmt('[') + \
self._content_profile.fmt(self.action_name) + \
self._format_profile.fmt(':') + \
diff --git a/buildstream/_scheduler/__init__.py b/buildstream/_scheduler/__init__.py
index a53a133c2..8e1140ffe 100644
--- a/buildstream/_scheduler/__init__.py
+++ b/buildstream/_scheduler/__init__.py
@@ -17,7 +17,7 @@
# Authors:
# Tristan Van Berkom <tristan.vanberkom@codethink.co.uk>
-from .queues import Queue, QueueStatus, QueueType
+from .queues import Queue, QueueStatus
from .queues.fetchqueue import FetchQueue
from .queues.trackqueue import TrackQueue
diff --git a/buildstream/_scheduler/jobs/__init__.py b/buildstream/_scheduler/jobs/__init__.py
new file mode 100644
index 000000000..0030f5c97
--- /dev/null
+++ b/buildstream/_scheduler/jobs/__init__.py
@@ -0,0 +1 @@
+from .elementjob import ElementJob
diff --git a/buildstream/_scheduler/jobs/elementjob.py b/buildstream/_scheduler/jobs/elementjob.py
new file mode 100644
index 000000000..68f4e0406
--- /dev/null
+++ b/buildstream/_scheduler/jobs/elementjob.py
@@ -0,0 +1,217 @@
+# Copyright (C) 2018 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+# Author:
+# Tristan Daniël Maat <tristan.maat@codethink.co.uk>
+#
+import os
+from contextlib import contextmanager
+
+from ruamel import yaml
+
+from ..._message import Message, MessageType
+from ...plugin import _plugin_lookup
+from ... import _signals
+
+from .job import Job
+
+
+# ElementJob()
+#
+# A job to run an element's commands. When this job is spawned
+# `action_cb` will be called, and when it completes `complete_cb` will
+# be called.
+#
+# Args:
+# scheduler (Scheduler): The scheduler
+# action_name (str): The queue action name
+# max_retries (int): The maximum number of retries
+# action_cb (callable): The function to execute on the child
+# complete_cb (callable): The function to execute when the job completes
+# element (Element): The element to work on
+# kwargs: Remaining Job() constructor arguments
+#
+# Here is the calling signature of the action_cb:
+#
+# action_cb():
+#
+# This function will be called in the child task
+#
+# Args:
+# element (Element): The element passed to the Job() constructor
+#
+# Returns:
+# (object): Any abstract simple python object, including a string, int,
+# bool, list or dict, this must be a simple serializable object.
+#
+# Here is the calling signature of the complete_cb:
+#
+# complete_cb():
+#
+# This function will be called when the child task completes
+#
+# Args:
+# job (Job): The job object which completed
+# element (Element): The element passed to the Job() constructor
+# success (bool): True if the action_cb did not raise an exception
+# result (object): The deserialized object returned by the `action_cb`, or None
+# if `success` is False
+#
+class ElementJob(Job):
+ def __init__(self, *args, element, queue, action_cb, complete_cb, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.queue = queue
+ self._element = element
+ self._action_cb = action_cb # The action callable function
+ self._complete_cb = complete_cb # The complete callable function
+
+ @property
+ def element(self):
+ return self._element
+
+ # _child_process()
+ #
+ # This will be executed after fork(), and is intended to perform
+ # the job's task.
+ #
+ # Returns:
+ # (any): A (simple!) object to be returned to the main thread
+ # as the result.
+ #
+ def _child_process(self):
+ return self._action_cb(self._element)
+
+ def _parent_complete(self, success, result):
+ self._complete_cb(self, self._element, success, self._result)
+
+ # _child_logging_enabled()
+ #
+ # Start the log for this job. This function will be given a
+ # template string for the path to a log file - this will contain
+ # "{pid}", which should be replaced with the current process'
+ # PID. (i.e., call something like `logfile.format(pid=os.getpid())`).
+ #
+ # Args:
+ # logfile (str): A template string that points to the logfile
+ # that should be used - replace {pid} first.
+ #
+ # Yields:
+ # (str) The path to the logfile with {pid} replaced.
+ #
+ @contextmanager
+ def _child_logging_enabled(self, logfile):
+ self._logfile = logfile.format(pid=os.getpid())
+
+ with open(self._logfile, 'a') as log:
+ # Write one last line to the log and flush it to disk
+ def flush_log():
+
+ # If the process currently had something happening in the I/O stack
+ # then trying to reenter the I/O stack will fire a runtime error.
+ #
+ # So just try to flush as well as we can at SIGTERM time
+ try:
+ # FIXME: Better logging
+
+ log.write('\n\nAction {} for element {} forcefully terminated\n'
+ .format(self.action_name, self._element.name))
+ log.flush()
+ except RuntimeError:
+ os.fsync(log.fileno())
+
+ self._element._set_log_handle(log)
+ with _signals.terminator(flush_log):
+ self._print_start_message(self._element, self._logfile)
+ yield self._logfile
+ self._element._set_log_handle(None)
+ self._logfile = None
+
+ # _message():
+ #
+ # Sends a message to the frontend
+ #
+ # Args:
+ # message_type (MessageType): The type of message to send
+ # message (str): The message
+ # kwargs: Remaining Message() constructor arguments
+ #
+ def _message(self, message_type, message, **kwargs):
+ args = dict(kwargs)
+ args['scheduler'] = True
+ self._scheduler.context.message(
+ Message(self._element._get_unique_id(),
+ message_type,
+ message,
+ **args))
+
+ def _print_start_message(self, element, logfile):
+ self._message(MessageType.START, self.action_name, logfile=logfile)
+
+ # Print the element's environment at the beginning of any element's log file.
+ #
+ # This should probably be omitted for non-build tasks but it's harmless here
+ elt_env = element.get_environment()
+ env_dump = yaml.round_trip_dump(elt_env, default_flow_style=False, allow_unicode=True)
+ self._message(MessageType.LOG,
+ "Build environment for element {}".format(element.name),
+ detail=env_dump, logfile=logfile)
+
+ # _child_log()
+ #
+ # Log a message returned by the frontend's main message handler
+ # and return it to the main process.
+ #
+ # Arguments:
+ # message (str): The message to log
+ #
+ # Returns:
+ # message (Message): A message object
+ #
+ def _child_log(self, message):
+ # Tag them on the way out the door...
+ message.action_name = self.action_name
+ message.task_id = self._element._get_unique_id()
+
+ # Use the plugin for the task for the output, not a plugin
+ # which might be acting on behalf of the task
+ plugin = _plugin_lookup(message.task_id)
+
+ with plugin._output_file() as output:
+ message_text = self._format_frontend_message(message, '[{}]'.format(plugin.name))
+ output.write('{}\n'.format(message_text))
+ output.flush()
+
+ return message
+
+ # _child_process_data()
+ #
+ # Abstract method to retrieve additional data that should be
+ # returned to the parent process. Note that the job result is
+ # retrieved independently.
+ #
+ # Values can later be retrieved in Job.child_data.
+ #
+ # Returns:
+ # (dict) A dict containing values later to be read by _process_sync_data
+ #
+ def _child_process_data(self):
+ data = {}
+
+ workspace = self._element._get_workspace()
+
+ if workspace is not None:
+ data['workspace'] = workspace.to_dict()
+
+ return data
diff --git a/buildstream/_scheduler/job.py b/buildstream/_scheduler/jobs/job.py
index 8b9af9393..580f9ff09 100644
--- a/buildstream/_scheduler/job.py
+++ b/buildstream/_scheduler/jobs/job.py
@@ -26,20 +26,21 @@ import datetime
import traceback
import asyncio
import multiprocessing
-from ruamel import yaml
+from contextlib import contextmanager
+
+import psutil
# BuildStream toplevel imports
-from .._exceptions import BstError, set_last_task_error
-from .._message import Message, MessageType, unconditional_messages
-from ..plugin import _plugin_lookup
-from .. import _signals, utils
+from ..._exceptions import ImplError, BstError, set_last_task_error
+from ..._message import MessageType, unconditional_messages
+from ... import _signals, utils
# Used to distinguish between status messages and return values
class Envelope():
def __init__(self, message_type, message):
- self.message_type = message_type
- self.message = message
+ self._message_type = message_type
+ self._message = message
# Process class that doesn't call waitpid on its own.
@@ -54,54 +55,50 @@ class Process(multiprocessing.Process):
# Job()
#
# The Job object represents a parallel task, when calling Job.spawn(),
-# the given `action_cb` will be called in parallel to the calling process,
-# and `complete_cb` will be called with the action result in the calling
-# process when the job completes.
+# the given `Job._child_process` will be called in parallel to the
+# calling process, and `Job._parent_complete` will be called with the
+# action result in the calling process when the job completes.
#
# Args:
# scheduler (Scheduler): The scheduler
-# element (Element): The element to operate on
# action_name (str): The queue action name
-# action_cb (callable): The action function
-# complete_cb (callable): The function to call when complete
+# logfile (str): A template string that points to the logfile
+# that should be used - should contain {pid}.
+# resources (iter(ResourceType)) - A set of resources this job
+# wants to use.
+# exclusive_resources (iter(ResourceType)) - A set of resources
+# this job wants to use
+# exclusively.
# max_retries (int): The maximum number of retries
#
-# Here is the calling signature of the action_cb:
-#
-# action_cb():
-#
-# This function will be called in the child task
-#
-# Args:
-# element (Element): The element passed to the Job() constructor
-#
-# Returns:
-# (object): Any abstract simple python object, including a string, int,
-# bool, list or dict, this must be a simple serializable object.
-#
-# Here is the calling signature of the complete_cb:
-#
-# complete_cb():
-#
-# This function will be called when the child task completes
-#
-# Args:
-# job (Job): The job object which completed
-# element (Element): The element passed to the Job() constructor
-# success (bool): True if the action_cb did not raise an exception
-# result (object): The deserialized object returned by the `action_cb`, or None
-# if `success` is False
-#
class Job():
- def __init__(self, scheduler, element, action_name, action_cb, complete_cb, *, max_retries=0):
+ def __init__(self, scheduler, action_name, logfile, *,
+ resources=None, exclusive_resources=None, max_retries=0):
+
+ if resources is None:
+ resources = set()
+ else:
+ resources = set(resources)
+ if exclusive_resources is None:
+ exclusive_resources = set()
+ else:
+ exclusive_resources = set(resources)
+
+ # Ensure nobody tries not use an exclusive resource.
+ assert exclusive_resources <= resources, "All exclusive resources must also be resources!"
#
# Public members
#
- self.element = element # The element we're processing
self.action_name = action_name # The action name for the Queue
- self.workspace_dict = None # A serialized Workspace object, after any modifications
+ self.child_data = None # Data to be sent to the main process
+
+ # The resources this job wants to access
+ self.resources = resources
+ # Resources this job needs to access exclusively, i.e., no
+ # other job should be allowed to access them
+ self.exclusive_resources = exclusive_resources
#
# Private members
@@ -110,13 +107,12 @@ class Job():
self._queue = multiprocessing.Queue() # A message passing queue
self._process = None # The Process object
self._watcher = None # Child process watcher
- self._action_cb = action_cb # The action callable function
- self._complete_cb = complete_cb # The complete callable function
self._listening = False # Whether the parent is currently listening
self._suspended = False # Whether this job is currently suspended
self._max_retries = max_retries # Maximum number of automatic retries
self._result = None # Return value of child action in the parent
self._tries = 0 # Try count, for retryable jobs
+ self._logfile = logfile
# spawn()
#
@@ -152,8 +148,7 @@ class Job():
# First resume the job if it's suspended
self.resume(silent=True)
- self._message(self.element, MessageType.STATUS,
- "{} terminating".format(self.action_name))
+ self._message(MessageType.STATUS, "{} terminating".format(self.action_name))
# Make sure there is no garbage on the queue
self._parent_stop_listening()
@@ -184,9 +179,15 @@ class Job():
def kill(self):
# Force kill
- self._message(self.element, MessageType.WARN,
+ self._message(MessageType.WARN,
"{} did not terminate gracefully, killing".format(self.action_name))
- utils._kill_process_tree(self._process.pid)
+
+ try:
+ utils._kill_process_tree(self._process.pid)
+ # This can happen if the process died of its own accord before
+ # we try to kill it
+ except psutil.NoSuchProcess:
+ return
# suspend()
#
@@ -194,7 +195,7 @@ class Job():
#
def suspend(self):
if not self._suspended:
- self._message(self.element, MessageType.STATUS,
+ self._message(MessageType.STATUS,
"{} suspending".format(self.action_name))
try:
@@ -219,42 +220,152 @@ class Job():
def resume(self, silent=False):
if self._suspended:
if not silent:
- self._message(self.element, MessageType.STATUS,
+ self._message(MessageType.STATUS,
"{} resuming".format(self.action_name))
os.kill(self._process.pid, signal.SIGCONT)
self._suspended = False
#######################################################
- # Local Private Methods #
+ # Abstract Methods #
#######################################################
+ # _parent_complete()
#
- # Methods prefixed with the word 'child' take place in the child process
+ # This will be executed after the job finishes, and is expected to
+ # pass the result to the main thread.
#
- # Methods prefixed with the word 'parent' take place in the parent process
+ # Args:
+ # success (bool): Whether the job was successful.
+ # result (any): The result returned by _child_process().
#
- # Other methods can be called in both child or parent processes
+ def _parent_complete(self, success, result):
+ raise ImplError("Job '{kind}' does not implement _parent_complete()"
+ .format(kind=type(self).__name__))
+
+ # _child_process()
#
- #######################################################
+ # This will be executed after fork(), and is intended to perform
+ # the job's task.
+ #
+ # Returns:
+ # (any): A (simple!) object to be returned to the main thread
+ # as the result.
+ #
+ def _child_process(self):
+ raise ImplError("Job '{kind}' does not implement _child_process()"
+ .format(kind=type(self).__name__))
+
+ # _child_logging_enabled()
+ #
+ # Start the log for this job. This function will be given a
+ # template string for the path to a log file - this will contain
+ # "{pid}", which should be replaced with the current process'
+ # PID. (i.e., call something like `logfile.format(pid=os.getpid())`).
+ #
+ # Args:
+ # logfile (str): A template string that points to the logfile
+ # that should be used - replace {pid} first.
+ #
+ # Yields:
+ # (str) The path to the logfile with {pid} replaced.
+ #
+ @contextmanager
+ def _child_logging_enabled(self, logfile):
+ raise ImplError("Job '{kind}' does not implement _child_logging_enabled()"
+ .format(kind=type(self).__name__))
# _message():
#
# Sends a message to the frontend
#
# Args:
- # plugin (Plugin): The plugin to send a message for
# message_type (MessageType): The type of message to send
# message (str): The message
# kwargs: Remaining Message() constructor arguments
#
- def _message(self, plugin, message_type, message, **kwargs):
- args = dict(kwargs)
- args['scheduler'] = True
- self._scheduler.context.message(
- Message(plugin._get_unique_id(),
- message_type,
- message,
- **args))
+ def _message(self, message_type, message, **kwargs):
+ raise ImplError("Job '{kind}' does not implement _message()"
+ .format(kind=type(self).__name__))
+
+ # _child_process_data()
+ #
+ # Abstract method to retrieve additional data that should be
+ # returned to the parent process. Note that the job result is
+ # retrieved independently.
+ #
+ # Values can later be retrieved in Job.child_data.
+ #
+ # Returns:
+ # (dict) A dict containing values later to be read by _process_sync_data
+ #
+ def _child_process_data(self):
+ return {}
+
+ # _child_log()
+ #
+ # Log a message returned by the frontend's main message handler
+ # and return it to the main process.
+ #
+ # This method is also expected to add process-specific information
+ # to the message (notably, action_name and task_id).
+ #
+ # Arguments:
+ # message (str): The message to log
+ #
+ # Returns:
+ # message (Message): A message object
+ #
+ def _child_log(self, message):
+ raise ImplError("Job '{kind}' does not implement _child_log()"
+ .format(kind=type(self).__name__))
+
+ #######################################################
+ # Local Private Methods #
+ #######################################################
+ #
+ # Methods prefixed with the word 'child' take place in the child process
+ #
+ # Methods prefixed with the word 'parent' take place in the parent process
+ #
+ # Other methods can be called in both child or parent processes
+ #
+ #######################################################
+
+ # _format_frontend_message()
+ #
+ # Format a message from the frontend for logging purposes. This
+ # will prepend a time code and add other information to help
+ # determine what happened.
+ #
+ # Args:
+ # message (Message) - The message to create a text from.
+ # name (str) - A name for the executing context.
+ #
+ # Returns:
+ # (str) The text to log.
+ #
+ def _format_frontend_message(self, message, name):
+ INDENT = " "
+ EMPTYTIME = "--:--:--"
+ template = "[{timecode: <8}] {type: <7} {name: <15}: {message}"
+
+ detail = ''
+ if message.detail is not None:
+ template += "\n\n{detail}"
+ detail = message.detail.rstrip('\n')
+ detail = INDENT + INDENT.join(detail.splitlines(True))
+
+ timecode = EMPTYTIME
+ if message.message_type in (MessageType.SUCCESS, MessageType.FAIL):
+ hours, remainder = divmod(int(message.elapsed.total_seconds()), 60**2)
+ minutes, seconds = divmod(remainder, 60)
+ timecode = "{0:02d}:{1:02d}:{2:02d}".format(hours, minutes, seconds)
+
+ return template.format(timecode=timecode,
+ type=message.message_type.upper(),
+ name=name,
+ message=message.message,
+ detail=detail)
# _child_action()
#
@@ -265,7 +376,7 @@ class Job():
#
def _child_action(self, queue):
- element = self.element
+ logfile = self._logfile
# This avoids some SIGTSTP signals from grandchildren
# getting propagated up to the master process
@@ -301,35 +412,24 @@ class Job():
# Time, log and and run the action function
#
with _signals.suspendable(stop_time, resume_time), \
- element._logging_enabled(self.action_name) as filename:
-
- self._message(element, MessageType.START, self.action_name, logfile=filename)
-
- # Print the element's environment at the beginning of any element's log file.
- #
- # This should probably be omitted for non-build tasks but it's harmless here
- elt_env = element.get_environment()
- env_dump = yaml.round_trip_dump(elt_env, default_flow_style=False, allow_unicode=True)
- self._message(element, MessageType.LOG,
- "Build environment for element {}".format(element.name),
- detail=env_dump, logfile=filename)
+ self._child_logging_enabled(logfile) as filename:
try:
# Try the task action
- result = self._action_cb(element)
+ result = self._child_process()
except BstError as e:
elapsed = datetime.datetime.now() - starttime
if self._tries <= self._max_retries:
- self._message(element, MessageType.FAIL, "Try #{} failed, retrying".format(self._tries),
+ self._message(MessageType.FAIL,
+ "Try #{} failed, retrying".format(self._tries),
elapsed=elapsed)
else:
- self._message(element, MessageType.FAIL, str(e),
+ self._message(MessageType.FAIL, str(e),
elapsed=elapsed, detail=e.detail,
logfile=filename, sandbox=e.sandbox)
- # Report changes in the workspace, even if there was a handled failure
- self._child_send_workspace()
+ self._queue.put(Envelope('child_data', self._child_process_data()))
# Report the exception to the parent (for internal testing purposes)
self._child_send_error(e)
@@ -343,18 +443,19 @@ class Job():
#
elapsed = datetime.datetime.now() - starttime
detail = "An unhandled exception occured:\n\n{}".format(traceback.format_exc())
- self._message(element, MessageType.BUG, self.action_name,
+
+ self._message(MessageType.BUG, self.action_name,
elapsed=elapsed, detail=detail,
logfile=filename)
self._child_shutdown(1)
else:
# No exception occurred in the action
- self._child_send_workspace()
+ self._queue.put(Envelope('child_data', self._child_process_data()))
self._child_send_result(result)
elapsed = datetime.datetime.now() - starttime
- self._message(element, MessageType.SUCCESS, self.action_name, elapsed=elapsed,
+ self._message(MessageType.SUCCESS, self.action_name, elapsed=elapsed,
logfile=filename)
# Shutdown needs to stay outside of the above context manager,
@@ -398,16 +499,6 @@ class Job():
envelope = Envelope('result', result)
self._queue.put(envelope)
- # _child_send_workspace()
- #
- # Sends the serialized workspace through the message queue, if any
- #
- def _child_send_workspace(self):
- workspace = self.element._get_workspace()
- if workspace:
- envelope = Envelope('workspace', workspace.to_dict())
- self._queue.put(envelope)
-
# _child_shutdown()
#
# Shuts down the child process by cleaning up and exiting the process
@@ -419,44 +510,6 @@ class Job():
self._queue.close()
sys.exit(exit_code)
- # _child_log()
- #
- # Logs a Message to the process's dedicated log file
- #
- # Args:
- # plugin (Plugin): The plugin to log for
- # message (Message): The message to log
- #
- def _child_log(self, plugin, message):
-
- with plugin._output_file() as output:
- INDENT = " "
- EMPTYTIME = "--:--:--"
-
- name = '[' + plugin.name + ']'
-
- fmt = "[{timecode: <8}] {type: <7} {name: <15}: {message}"
- detail = ''
- if message.detail is not None:
- fmt += "\n\n{detail}"
- detail = message.detail.rstrip('\n')
- detail = INDENT + INDENT.join(detail.splitlines(True))
-
- timecode = EMPTYTIME
- if message.message_type in (MessageType.SUCCESS, MessageType.FAIL):
- hours, remainder = divmod(int(message.elapsed.total_seconds()), 60 * 60)
- minutes, seconds = divmod(remainder, 60)
- timecode = "{0:02d}:{1:02d}:{2:02d}".format(hours, minutes, seconds)
-
- message_text = fmt.format(timecode=timecode,
- type=message.message_type.upper(),
- name=name,
- message=message.message,
- detail=detail)
-
- output.write('{}\n'.format(message_text))
- output.flush()
-
# _child_message_handler()
#
# A Context delegate for handling messages, this replaces the
@@ -470,16 +523,8 @@ class Job():
#
def _child_message_handler(self, message, context):
- # Tag them on the way out the door...
- message.action_name = self.action_name
- message.task_id = self.element._get_unique_id()
-
- # Use the plugin for the task for the output, not a plugin
- # which might be acting on behalf of the task
- plugin = _plugin_lookup(message.task_id)
-
# Log first
- self._child_log(plugin, message)
+ message = self._child_log(message)
if message.message_type == MessageType.FAIL and self._tries <= self._max_retries:
# Job will be retried, display failures as warnings in the frontend
@@ -519,7 +564,8 @@ class Job():
self.spawn()
return
- self._complete_cb(self, self.element, returncode == 0, self._result)
+ self._parent_complete(returncode == 0, self._result)
+ self._scheduler.job_completed(self, returncode == 0)
# _parent_process_envelope()
#
@@ -536,21 +582,22 @@ class Job():
if not self._listening:
return
- if envelope.message_type == 'message':
+ if envelope._message_type == 'message':
# Propagate received messages from children
# back through the context.
- self._scheduler.context.message(envelope.message)
- elif envelope.message_type == 'error':
+ self._scheduler.context.message(envelope._message)
+ elif envelope._message_type == 'error':
# For regression tests only, save the last error domain / reason
# reported from a child task in the main process, this global state
# is currently managed in _exceptions.py
- set_last_task_error(envelope.message['domain'],
- envelope.message['reason'])
- elif envelope.message_type == 'result':
+ set_last_task_error(envelope._message['domain'],
+ envelope._message['reason'])
+ elif envelope._message_type == 'result':
assert self._result is None
- self._result = envelope.message
- elif envelope.message_type == 'workspace':
- self.workspace_dict = envelope.message
+ self._result = envelope._message
+ elif envelope._message_type == 'child_data':
+ # If we retry a job, we assign a new value to this
+ self.child_data = envelope._message
else:
raise Exception()
diff --git a/buildstream/_scheduler/queues/__init__.py b/buildstream/_scheduler/queues/__init__.py
index b9acef18c..3b2293919 100644
--- a/buildstream/_scheduler/queues/__init__.py
+++ b/buildstream/_scheduler/queues/__init__.py
@@ -1 +1 @@
-from .queue import Queue, QueueStatus, QueueType
+from .queue import Queue, QueueStatus
diff --git a/buildstream/_scheduler/queues/buildqueue.py b/buildstream/_scheduler/queues/buildqueue.py
index 50ba312ff..7f8ac9e8f 100644
--- a/buildstream/_scheduler/queues/buildqueue.py
+++ b/buildstream/_scheduler/queues/buildqueue.py
@@ -18,7 +18,8 @@
# Tristan Van Berkom <tristan.vanberkom@codethink.co.uk>
# Jürg Billeter <juerg.billeter@codethink.co.uk>
-from . import Queue, QueueStatus, QueueType
+from . import Queue, QueueStatus
+from ..resources import ResourceType
# A queue which assembles elements
@@ -27,7 +28,7 @@ class BuildQueue(Queue):
action_name = "Build"
complete_name = "Built"
- queue_type = QueueType.BUILD
+ resources = [ResourceType.PROCESS]
def process(self, element):
element._assemble()
@@ -50,7 +51,7 @@ class BuildQueue(Queue):
return QueueStatus.READY
- def done(self, element, result, success):
+ def done(self, job, element, result, success):
if success:
# Inform element in main process that assembly is done
diff --git a/buildstream/_scheduler/queues/fetchqueue.py b/buildstream/_scheduler/queues/fetchqueue.py
index bdff15667..265890b7a 100644
--- a/buildstream/_scheduler/queues/fetchqueue.py
+++ b/buildstream/_scheduler/queues/fetchqueue.py
@@ -22,7 +22,8 @@
from ... import Consistency
# Local imports
-from . import Queue, QueueStatus, QueueType
+from . import Queue, QueueStatus
+from ..resources import ResourceType
# A queue which fetches element sources
@@ -31,7 +32,7 @@ class FetchQueue(Queue):
action_name = "Fetch"
complete_name = "Fetched"
- queue_type = QueueType.FETCH
+ resources = [ResourceType.DOWNLOAD]
def __init__(self, scheduler, skip_cached=False):
super().__init__(scheduler)
@@ -66,7 +67,7 @@ class FetchQueue(Queue):
return QueueStatus.READY
- def done(self, element, result, success):
+ def done(self, _, element, result, success):
if not success:
return False
diff --git a/buildstream/_scheduler/queues/pullqueue.py b/buildstream/_scheduler/queues/pullqueue.py
index b4f5b0d73..efaa59ef3 100644
--- a/buildstream/_scheduler/queues/pullqueue.py
+++ b/buildstream/_scheduler/queues/pullqueue.py
@@ -19,7 +19,8 @@
# Jürg Billeter <juerg.billeter@codethink.co.uk>
# Local imports
-from . import Queue, QueueStatus, QueueType
+from . import Queue, QueueStatus
+from ..resources import ResourceType
# A queue which pulls element artifacts
@@ -28,7 +29,7 @@ class PullQueue(Queue):
action_name = "Pull"
complete_name = "Pulled"
- queue_type = QueueType.FETCH
+ resources = [ResourceType.UPLOAD]
def process(self, element):
# returns whether an artifact was downloaded or not
@@ -51,7 +52,7 @@ class PullQueue(Queue):
else:
return QueueStatus.SKIP
- def done(self, element, result, success):
+ def done(self, _, element, result, success):
if not success:
return False
diff --git a/buildstream/_scheduler/queues/pushqueue.py b/buildstream/_scheduler/queues/pushqueue.py
index 624eefd1d..568e053d6 100644
--- a/buildstream/_scheduler/queues/pushqueue.py
+++ b/buildstream/_scheduler/queues/pushqueue.py
@@ -19,7 +19,8 @@
# Jürg Billeter <juerg.billeter@codethink.co.uk>
# Local imports
-from . import Queue, QueueStatus, QueueType
+from . import Queue, QueueStatus
+from ..resources import ResourceType
# A queue which pushes element artifacts
@@ -28,7 +29,7 @@ class PushQueue(Queue):
action_name = "Push"
complete_name = "Pushed"
- queue_type = QueueType.PUSH
+ resources = [ResourceType.UPLOAD]
def process(self, element):
# returns whether an artifact was uploaded or not
@@ -40,7 +41,7 @@ class PushQueue(Queue):
return QueueStatus.READY
- def done(self, element, result, success):
+ def done(self, _, element, result, success):
if not success:
return False
diff --git a/buildstream/_scheduler/queues/queue.py b/buildstream/_scheduler/queues/queue.py
index d0c482802..8ca3ac063 100644
--- a/buildstream/_scheduler/queues/queue.py
+++ b/buildstream/_scheduler/queues/queue.py
@@ -19,32 +19,20 @@
# Jürg Billeter <juerg.billeter@codethink.co.uk>
# System imports
+import os
from collections import deque
from enum import Enum
import traceback
# Local imports
-from ..job import Job
+from ..jobs import ElementJob
+from ..resources import ResourceType
# BuildStream toplevel imports
from ..._exceptions import BstError, set_last_task_error
from ..._message import Message, MessageType
-# Indicates the kind of activity
-#
-#
-class QueueType():
- # Tasks which download stuff from the internet
- FETCH = 1
-
- # CPU/Disk intensive tasks
- BUILD = 2
-
- # Tasks which upload stuff to the internet
- PUSH = 3
-
-
# Queue status for a given element
#
#
@@ -69,14 +57,13 @@ class Queue():
# These should be overridden on class data of of concrete Queue implementations
action_name = None
complete_name = None
- queue_type = None
+ resources = [] # Resources this queues' jobs want
def __init__(self, scheduler):
#
# Public members
#
- self.active_jobs = [] # List of active ongoing Jobs, for scheduler observation
self.failed_elements = [] # List of failed elements, for the frontend
self.processed_elements = [] # List of processed elements, for the frontend
self.skipped_elements = [] # List of skipped elements, for the frontend
@@ -88,13 +75,13 @@ class Queue():
self._wait_queue = deque()
self._done_queue = deque()
self._max_retries = 0
- if self.queue_type == QueueType.FETCH or self.queue_type == QueueType.PUSH:
- self._max_retries = scheduler.context.sched_network_retries
# Assert the subclass has setup class data
assert self.action_name is not None
assert self.complete_name is not None
- assert self.queue_type is not None
+
+ if ResourceType.UPLOAD in self.resources or ResourceType.DOWNLOAD in self.resources:
+ self._max_retries = scheduler.context.sched_network_retries
#####################################################
# Abstract Methods for Queue implementations #
@@ -143,6 +130,7 @@ class Queue():
# Abstract method for handling a successful job completion.
#
# Args:
+ # job (Job): The job which completed processing
# element (Element): The element which completed processing
# result (any): The return value of the process() implementation
# success (bool): True if the process() implementation did not
@@ -152,7 +140,7 @@ class Queue():
# (bool): True if the element should appear to be processsed,
# Otherwise False will count the element as "skipped"
#
- def done(self, element, result, success):
+ def done(self, job, element, result, success):
pass
#####################################################
@@ -170,10 +158,22 @@ class Queue():
if not elts:
return
+ # Note: The internal lists work with jobs. This is not
+ # reflected in any external methods (except
+ # pop/peek_ready_jobs).
+ def create_job(element):
+ logfile = self._element_log_path(element)
+ return ElementJob(self._scheduler, self.action_name,
+ logfile, element=element, queue=self,
+ resources=self.resources,
+ action_cb=self.process,
+ complete_cb=self._job_done,
+ max_retries=self._max_retries)
+
# Place skipped elements directly on the done queue
- elts = list(elts)
- skip = [elt for elt in elts if self.status(elt) == QueueStatus.SKIP]
- wait = [elt for elt in elts if elt not in skip]
+ jobs = [create_job(elt) for elt in elts]
+ skip = [job for job in jobs if self.status(job.element) == QueueStatus.SKIP]
+ wait = [job for job in jobs if job not in skip]
self._wait_queue.extend(wait)
self._done_queue.extend(skip)
@@ -189,7 +189,7 @@ class Queue():
#
def dequeue(self):
while self._done_queue:
- yield self._done_queue.popleft()
+ yield self._done_queue.popleft().element
# dequeue_ready()
#
@@ -201,7 +201,10 @@ class Queue():
def dequeue_ready(self):
return any(self._done_queue)
- # process_ready()
+ # pop_ready_jobs()
+ #
+ # Returns:
+ # ([Job]): A list of jobs to run
#
# Process elements in the queue, moving elements which were enqueued
# into the dequeue pool, and processing them if necessary.
@@ -211,46 +214,45 @@ class Queue():
#
# o Elements which are QueueStatus.WAIT will not be effected
#
- # o Elements which are QueueStatus.READY will be processed
- # and added to the Queue.active_jobs list as a result,
- # given that the scheduler allows the Queue enough tokens
- # for the given queue's job type
- #
# o Elements which are QueueStatus.SKIP will move directly
# to the dequeue pool
#
- def process_ready(self):
- scheduler = self._scheduler
+ # o For Elements which are QueueStatus.READY a Job will be
+ # created and returned to the caller, given that the scheduler
+ # allows the Queue enough resources for the given job
+ #
+ def pop_ready_jobs(self):
unready = []
+ ready = []
- while self._wait_queue and scheduler.get_job_token(self.queue_type):
- element = self._wait_queue.popleft()
+ while self._wait_queue:
+ job = self._wait_queue.popleft()
+ element = job.element
status = self.status(element)
if status == QueueStatus.WAIT:
- scheduler.put_job_token(self.queue_type)
- unready.append(element)
+ unready.append(job)
continue
elif status == QueueStatus.SKIP:
- scheduler.put_job_token(self.queue_type)
- self._done_queue.append(element)
+ self._done_queue.append(job)
self.skipped_elements.append(element)
continue
self.prepare(element)
-
- job = Job(scheduler, element, self.action_name,
- self.process, self._job_done,
- max_retries=self._max_retries)
- scheduler.job_starting(job)
-
- job.spawn()
- self.active_jobs.append(job)
+ ready.append(job)
# These were not ready but were in the beginning, give em
# first priority again next time around
self._wait_queue.extendleft(unready)
+ return ready
+
+ def peek_ready_jobs(self):
+ def ready(job):
+ return self.status(job.element) == QueueStatus.READY
+
+ yield from (job for job in self._wait_queue if ready(job))
+
#####################################################
# Private Methods #
#####################################################
@@ -265,12 +267,16 @@ class Queue():
# job (Job): The job which completed
#
def _update_workspaces(self, element, job):
+ workspace_dict = None
+ if job.child_data:
+ workspace_dict = job.child_data.get('workspace', None)
+
# Handle any workspace modifications now
#
- if job.workspace_dict:
+ if workspace_dict:
context = element._get_context()
workspaces = context.get_workspaces()
- if workspaces.update_workspace(element._get_full_name(), job.workspace_dict):
+ if workspaces.update_workspace(element._get_full_name(), workspace_dict):
try:
workspaces.save_config()
except BstError as e:
@@ -291,17 +297,15 @@ class Queue():
#
def _job_done(self, job, element, success, result):
- # Remove from our jobs
- self.active_jobs.remove(job)
-
- # Update workspaces in the main task before calling any queue implementation
+ # Update values that need to be synchronized in the main task
+ # before calling any queue implementation
self._update_workspaces(element, job)
# Give the result of the job to the Queue implementor,
# and determine if it should be considered as processed
# or skipped.
try:
- processed = self.done(element, result, success)
+ processed = self.done(job, element, result, success)
except BstError as e:
@@ -330,7 +334,7 @@ class Queue():
# No exception occured, handle the success/failure state in the normal way
#
if success:
- self._done_queue.append(element)
+ self._done_queue.append(job)
if processed:
self.processed_elements.append(element)
else:
@@ -338,18 +342,22 @@ class Queue():
else:
self.failed_elements.append(element)
- # Give the token for this job back to the scheduler
- # immediately before invoking another round of scheduling
- self._scheduler.put_job_token(self.queue_type)
-
- # Notify frontend
- self._scheduler.job_completed(self, job, success)
-
- self._scheduler.sched()
-
# Convenience wrapper for Queue implementations to send
# a message for the element they are processing
def _message(self, element, message_type, brief, **kwargs):
context = element._get_context()
message = Message(element._get_unique_id(), message_type, brief, **kwargs)
context.message(message)
+
+ def _element_log_path(self, element):
+ project = element._get_project()
+ context = element._get_context()
+
+ key = element._get_display_key()[1]
+ action = self.action_name.lower()
+ logfile = "{key}-{action}.{{pid}}.log".format(key=key, action=action)
+
+ directory = os.path.join(context.logdir, project.name, element.normal_name)
+
+ os.makedirs(directory, exist_ok=True)
+ return os.path.join(directory, logfile)
diff --git a/buildstream/_scheduler/queues/trackqueue.py b/buildstream/_scheduler/queues/trackqueue.py
index 3a65f01d0..c7a8f4cc7 100644
--- a/buildstream/_scheduler/queues/trackqueue.py
+++ b/buildstream/_scheduler/queues/trackqueue.py
@@ -23,7 +23,8 @@ from ...plugin import _plugin_lookup
from ... import SourceError
# Local imports
-from . import Queue, QueueStatus, QueueType
+from . import Queue, QueueStatus
+from ..resources import ResourceType
# A queue which tracks sources
@@ -32,7 +33,7 @@ class TrackQueue(Queue):
action_name = "Track"
complete_name = "Tracked"
- queue_type = QueueType.FETCH
+ resources = [ResourceType.DOWNLOAD]
def process(self, element):
return element._track()
@@ -47,7 +48,7 @@ class TrackQueue(Queue):
return QueueStatus.READY
- def done(self, element, result, success):
+ def done(self, _, element, result, success):
if not success:
return False
diff --git a/buildstream/_scheduler/resources.py b/buildstream/_scheduler/resources.py
new file mode 100644
index 000000000..bbf851b06
--- /dev/null
+++ b/buildstream/_scheduler/resources.py
@@ -0,0 +1,105 @@
+class ResourceType():
+ CACHE = 0
+ DOWNLOAD = 1
+ PROCESS = 2
+ UPLOAD = 3
+
+
+class Resources():
+ def __init__(self, num_builders, num_fetchers, num_pushers):
+ self._max_resources = {
+ ResourceType.CACHE: 1,
+ ResourceType.DOWNLOAD: num_fetchers,
+ ResourceType.PROCESS: num_builders,
+ ResourceType.UPLOAD: num_pushers
+ }
+
+ # Resources jobs are currently using.
+ self._used_resources = {
+ ResourceType.CACHE: 0,
+ ResourceType.DOWNLOAD: 0,
+ ResourceType.PROCESS: 0,
+ ResourceType.UPLOAD: 0
+ }
+
+ # Resources jobs currently want exclusive access to. The set
+ # of jobs that have asked for exclusive access is the value -
+ # this is so that we can avoid scheduling any other jobs until
+ # *all* exclusive jobs that "register interest" have finished
+ # - which avoids starving them of scheduling time.
+ self._exclusive_resources = {
+ ResourceType.CACHE: set(),
+ ResourceType.DOWNLOAD: set(),
+ ResourceType.PROCESS: set(),
+ ResourceType.UPLOAD: set()
+ }
+
+ def clear_job_resources(self, job):
+ for resource in job.exclusive_resources:
+ self._exclusive_resources[resource].remove(hash(job))
+
+ for resource in job.resources:
+ self._used_resources[resource] -= 1
+
+ def reserve_exclusive_resources(self, job):
+ exclusive = job.exclusive_resources
+
+ # The very first thing we do is to register any exclusive
+ # resources this job may want. Even if the job is not yet
+ # allowed to run (because another job is holding the resource
+ # it wants), we can still set this - it just means that any
+ # job *currently* using these resources has to finish first,
+ # and no new jobs wanting these can be launched (except other
+ # exclusive-access jobs).
+ #
+ for resource in exclusive:
+ self._exclusive_resources[resource].add(hash(job))
+
+ def reserve_job_resources(self, job):
+ # First, we check if the job wants to access a resource that
+ # another job wants exclusive access to. If so, it cannot be
+ # scheduled.
+ #
+ # Note that if *both* jobs want this exclusively, we don't
+ # fail yet.
+ #
+ # FIXME: I *think* we can deadlock if two jobs want disjoint
+ # sets of exclusive and non-exclusive resources. This
+ # is currently not possible, but may be worth thinking
+ # about.
+ #
+ for resource in job.resources - job.exclusive_resources:
+ # If our job wants this resource exclusively, we never
+ # check this, so we can get away with not (temporarily)
+ # removing it from the set.
+ if self._exclusive_resources[resource]:
+ return False
+
+ # Now we check if anything is currently using any resources
+ # this job wants exclusively. If so, the job cannot be
+ # scheduled.
+ #
+ # Since jobs that use a resource exclusively are also using
+ # it, this means only one exclusive job can ever be scheduled
+ # at a time, despite being allowed to be part of the exclusive
+ # set.
+ #
+ for exclusive in job.exclusive_resources:
+ if self._used_resources[exclusive] != 0:
+ return False
+
+ # Finally, we check if we have enough of each resource
+ # available. If we don't have enough, the job cannot be
+ # scheduled.
+ for resource in job.resources:
+ if (self._max_resources[resource] > 0 and
+ self._used_resources[resource] >= self._max_resources[resource]):
+ return False
+
+ # Now we register the fact that our job is using the resources
+ # it asked for, and tell the scheduler that it is allowed to
+ # continue.
+ for resource in job.resources:
+ self._used_resources[resource] += 1
+
+ return True
diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py
index 7bfbc958e..bc182db32 100644
--- a/buildstream/_scheduler/scheduler.py
+++ b/buildstream/_scheduler/scheduler.py
@@ -21,12 +21,13 @@
# System imports
import os
import asyncio
+from itertools import chain
import signal
import datetime
from contextlib import contextmanager
# Local imports
-from .queues import QueueType
+from .resources import Resources
# A decent return code for Scheduler.run()
@@ -69,6 +70,8 @@ class Scheduler():
#
# Public members
#
+ self.active_jobs = [] # Jobs currently being run in the scheduler
+ self.waiting_jobs = [] # Jobs waiting for resources
self.queues = None # Exposed for the frontend to print summaries
self.context = context # The Context object shared with Queues
self.terminated = False # Whether the scheduler was asked to terminate or has terminated
@@ -90,13 +93,9 @@ class Scheduler():
self._suspendtime = None
self._queue_jobs = True # Whether we should continue to queue jobs
- # Initialize task tokens with the number allowed by
- # the user configuration
- self._job_tokens = {
- QueueType.FETCH: context.sched_fetchers,
- QueueType.BUILD: context.sched_builders,
- QueueType.PUSH: context.sched_pushers
- }
+ self._resources = Resources(context.sched_builders,
+ context.sched_fetchers,
+ context.sched_pushers)
# run()
#
@@ -129,7 +128,7 @@ class Scheduler():
self._connect_signals()
# Run the queues
- self.sched()
+ self._schedule_queue_jobs()
self.loop.run_forever()
self.loop.close()
@@ -209,18 +208,74 @@ class Scheduler():
starttime = timenow
return timenow - starttime
- # sched()
+ # schedule_jobs()
+ #
+ # Args:
+ # jobs ([Job]): A list of jobs to schedule
+ #
+ # Schedule 'Job's for the scheduler to run. Jobs scheduled will be
+ # run as soon any other queueing jobs finish, provided sufficient
+ # resources are available for them to run
+ #
+ def schedule_jobs(self, jobs):
+ for job in jobs:
+ self.waiting_jobs.append(job)
+
+ # job_completed():
+ #
+ # Called when a Job completes
+ #
+ # Args:
+ # queue (Queue): The Queue holding a complete job
+ # job (Job): The completed Job
+ # success (bool): Whether the Job completed with a success status
+ #
+ def job_completed(self, job, success):
+ self._resources.clear_job_resources(job)
+ self.active_jobs.remove(job)
+ self._job_complete_callback(job, success)
+ self._schedule_queue_jobs()
+ self._sched()
+
+ #######################################################
+ # Local Private Methods #
+ #######################################################
+
+ # _sched()
#
# The main driving function of the scheduler, it will be called
- # automatically when Scheduler.run() is called initially, and needs
- # to be called whenever a job can potentially be scheduled, usually
- # when a Queue completes handling of a job.
+ # automatically when Scheduler.run() is called initially,
+ #
+ def _sched(self):
+ for job in self.waiting_jobs:
+ self._resources.reserve_exclusive_resources(job)
+
+ for job in self.waiting_jobs:
+ if not self._resources.reserve_job_resources(job):
+ continue
+
+ job.spawn()
+ self.waiting_jobs.remove(job)
+ self.active_jobs.append(job)
+
+ if self._job_start_callback:
+ self._job_start_callback(job)
+
+ # If nothings ticking, time to bail out
+ if not self.active_jobs and not self.waiting_jobs:
+ self.loop.stop()
+
+ # _schedule_queue_jobs()
#
- # This will process the Queues and pull elements through the Queues
+ # Ask the queues what jobs they want to schedule and schedule
+ # them. This is done here so we can ask for new jobs when jobs
+ # from previous queues become available.
+ #
+ # This will process the Queues, pull elements through the Queues
# and process anything that is ready.
#
- def sched(self):
-
+ def _schedule_queue_jobs(self):
+ ready = []
process_queues = True
while self._queue_jobs and process_queues:
@@ -233,90 +288,29 @@ class Scheduler():
# Dequeue processed elements for the next queue
elements = list(queue.dequeue())
- elements = list(elements)
# Kickoff whatever processes can be processed at this time
#
- # We start by queuing from the last queue first, because we want to
- # give priority to queues later in the scheduling process in the case
- # that multiple queues share the same token type.
+ # We start by queuing from the last queue first, because
+ # we want to give priority to queues later in the
+ # scheduling process in the case that multiple queues
+ # share the same token type.
#
- # This avoids starvation situations where we dont move on to fetch
- # tasks for elements which failed to pull, and thus need all the pulls
- # to complete before ever starting a build
- for queue in reversed(self.queues):
- queue.process_ready()
-
- # process_ready() may have skipped jobs, adding them to the done_queue.
- # Pull these skipped elements forward to the next queue and process them.
+ # This avoids starvation situations where we dont move on
+ # to fetch tasks for elements which failed to pull, and
+ # thus need all the pulls to complete before ever starting
+ # a build
+ ready.extend(chain.from_iterable(
+ queue.pop_ready_jobs() for queue in reversed(self.queues)
+ ))
+
+ # pop_ready_jobs() may have skipped jobs, adding them to
+ # the done_queue. Pull these skipped elements forward to
+ # the next queue and process them.
process_queues = any(q.dequeue_ready() for q in self.queues)
- # If nothings ticking, time to bail out
- ticking = 0
- for queue in self.queues:
- ticking += len(queue.active_jobs)
-
- if ticking == 0:
- self.loop.stop()
-
- # get_job_token():
- #
- # Used by the Queue object to obtain a token for
- # processing a Job, if a Queue does not receive a token
- # then it must wait until a later time in order to
- # process pending jobs.
- #
- # Args:
- # queue_type (QueueType): The type of token to obtain
- #
- # Returns:
- # (bool): Whether a token was handed out or not
- #
- def get_job_token(self, queue_type):
- if self._job_tokens[queue_type] > 0:
- self._job_tokens[queue_type] -= 1
- return True
- return False
-
- # put_job_token():
- #
- # Return a job token to the scheduler. Tokens previously
- # received with get_job_token() must be returned to
- # the scheduler once the associated job is complete.
- #
- # Args:
- # queue_type (QueueType): The type of token to obtain
- #
- def put_job_token(self, queue_type):
- self._job_tokens[queue_type] += 1
-
- # job_starting():
- #
- # Called by the Queue when starting a Job
- #
- # Args:
- # job (Job): The starting Job
- #
- def job_starting(self, job):
- if self._job_start_callback:
- self._job_start_callback(job.element, job.action_name)
-
- # job_completed():
- #
- # Called by the Queue when a Job completes
- #
- # Args:
- # queue (Queue): The Queue holding a complete job
- # job (Job): The completed Job
- # success (bool): Whether the Job completed with a success status
- #
- def job_completed(self, queue, job, success):
- if self._job_complete_callback:
- self._job_complete_callback(job.element, queue, job.action_name, success)
-
- #######################################################
- # Local Private Methods #
- #######################################################
+ self.schedule_jobs(ready)
+ self._sched()
# _suspend_jobs()
#
@@ -326,9 +320,8 @@ class Scheduler():
if not self.suspended:
self._suspendtime = datetime.datetime.now()
self.suspended = True
- for queue in self.queues:
- for job in queue.active_jobs:
- job.suspend()
+ for job in self.active_jobs:
+ job.suspend()
# _resume_jobs()
#
@@ -336,9 +329,8 @@ class Scheduler():
#
def _resume_jobs(self):
if self.suspended:
- for queue in self.queues:
- for job in queue.active_jobs:
- job.resume()
+ for job in self.active_jobs:
+ job.resume()
self.suspended = False
self._starttime += (datetime.datetime.now() - self._suspendtime)
self._suspendtime = None
@@ -401,19 +393,18 @@ class Scheduler():
wait_limit = 20.0
# First tell all jobs to terminate
- for queue in self.queues:
- for job in queue.active_jobs:
- job.terminate()
+ for job in self.active_jobs:
+ job.terminate()
# Now wait for them to really terminate
- for queue in self.queues:
- for job in queue.active_jobs:
- elapsed = datetime.datetime.now() - wait_start
- timeout = max(wait_limit - elapsed.total_seconds(), 0.0)
- if not job.terminate_wait(timeout):
- job.kill()
-
- self.loop.stop()
+ for job in self.active_jobs:
+ elapsed = datetime.datetime.now() - wait_start
+ timeout = max(wait_limit - elapsed.total_seconds(), 0.0)
+ if not job.terminate_wait(timeout):
+ job.kill()
+
+ # Clear out the waiting jobs
+ self.waiting_jobs = []
# Regular timeout for driving status in the UI
def _tick(self):