diff options
author | Tristan Maat <tristan.maat@codethink.co.uk> | 2018-07-16 15:31:55 +0100 |
---|---|---|
committer | Tristan Van Berkom <tristan.vanberkom@codethink.co.uk> | 2018-07-18 14:45:59 +0900 |
commit | 249256346613dea52ad32d945dbd303f54bd245e (patch) | |
tree | 2a977a806bdc0cd813b50248fb3717d7f841ba88 | |
parent | 4f9da15df4691fe582c0d05ccc61a7927dee55ca (diff) | |
download | buildstream-249256346613dea52ad32d945dbd303f54bd245e.tar.gz |
Make Jobs abstract and element-independent
-rw-r--r-- | buildstream/_frontend/app.py | 36 | ||||
-rw-r--r-- | buildstream/_frontend/status.py | 28 | ||||
-rw-r--r-- | buildstream/_scheduler/__init__.py | 3 | ||||
-rw-r--r-- | buildstream/_scheduler/jobs/__init__.py | 1 | ||||
-rw-r--r-- | buildstream/_scheduler/jobs/elementjob.py | 163 | ||||
-rw-r--r-- | buildstream/_scheduler/jobs/job.py (renamed from buildstream/_scheduler/job.py) | 372 | ||||
-rw-r--r-- | buildstream/_scheduler/queues/__init__.py | 2 | ||||
-rw-r--r-- | buildstream/_scheduler/queues/buildqueue.py | 7 | ||||
-rw-r--r-- | buildstream/_scheduler/queues/fetchqueue.py | 7 | ||||
-rw-r--r-- | buildstream/_scheduler/queues/pullqueue.py | 7 | ||||
-rw-r--r-- | buildstream/_scheduler/queues/pushqueue.py | 7 | ||||
-rw-r--r-- | buildstream/_scheduler/queues/queue.py | 136 | ||||
-rw-r--r-- | buildstream/_scheduler/queues/trackqueue.py | 7 | ||||
-rw-r--r-- | buildstream/_scheduler/resources.py | 105 | ||||
-rw-r--r-- | buildstream/_scheduler/scheduler.py | 213 |
15 files changed, 718 insertions, 376 deletions
diff --git a/buildstream/_frontend/app.py b/buildstream/_frontend/app.py index 4675b0eb0..3cf2f5180 100644 --- a/buildstream/_frontend/app.py +++ b/buildstream/_frontend/app.py @@ -40,6 +40,7 @@ from .._message import Message, MessageType, unconditional_messages from .._stream import Stream from .._versions import BST_FORMAT_VERSION from .. import _yaml +from .._scheduler import ElementJob # Import frontend assets from . import Profile, LogLine, Status @@ -492,30 +493,37 @@ class App(): def _tick(self, elapsed): self._maybe_render_status() - def _job_started(self, element, action_name): - self._status.add_job(element, action_name) + def _job_started(self, job): + self._status.add_job(job) self._maybe_render_status() - def _job_completed(self, element, queue, action_name, success): - self._status.remove_job(element, action_name) + def _job_completed(self, job, success): + self._status.remove_job(job) self._maybe_render_status() # Dont attempt to handle a failure if the user has already opted to # terminate if not success and not self.stream.terminated: - # Get the last failure message for additional context - failure = self._fail_messages.get(element._get_unique_id()) + if isinstance(job, ElementJob): + element = job.element + queue = job.queue - # XXX This is dangerous, sometimes we get the job completed *before* - # the failure message reaches us ?? - if not failure: - self._status.clear() - click.echo("\n\n\nBUG: Message handling out of sync, " + - "unable to retrieve failure message for element {}\n\n\n\n\n" - .format(element), err=True) + # Get the last failure message for additional context + failure = self._fail_messages.get(element._get_unique_id()) + + # XXX This is dangerous, sometimes we get the job completed *before* + # the failure message reaches us ?? + if not failure: + self._status.clear() + click.echo("\n\n\nBUG: Message handling out of sync, " + + "unable to retrieve failure message for element {}\n\n\n\n\n" + .format(element), err=True) + else: + self._handle_failure(element, queue, failure) else: - self._handle_failure(element, queue, failure) + click.echo("\nTerminating all jobs\n", err=True) + self.stream.terminate() def _handle_failure(self, element, queue, failure): diff --git a/buildstream/_frontend/status.py b/buildstream/_frontend/status.py index 3f66e009a..51b28d9cf 100644 --- a/buildstream/_frontend/status.py +++ b/buildstream/_frontend/status.py @@ -21,6 +21,7 @@ from blessings import Terminal # Import a widget internal for formatting time codes from .widget import TimeCode +from .._scheduler import ElementJob # Status() @@ -77,9 +78,9 @@ class Status(): # element (Element): The element of the job to track # action_name (str): The action name for this job # - def add_job(self, element, action_name): + def add_job(self, job): elapsed = self._stream.elapsed_time - job = _StatusJob(self._context, element, action_name, self._content_profile, self._format_profile, elapsed) + job = _StatusJob(self._context, job, self._content_profile, self._format_profile, elapsed) self._jobs.append(job) self._need_alloc = True @@ -91,7 +92,13 @@ class Status(): # element (Element): The element of the job to track # action_name (str): The action name for this job # - def remove_job(self, element, action_name): + def remove_job(self, job): + action_name = job.action_name + if not isinstance(job, ElementJob): + element = None + else: + element = job.element + self._jobs = [ job for job in self._jobs if not (job.element is element and @@ -358,15 +365,19 @@ class _StatusHeader(): # # Args: # context (Context): The Context -# element (Element): The element being processed -# action_name (str): The name of the action +# job (Job): The job being processed # content_profile (Profile): Formatting profile for content text # format_profile (Profile): Formatting profile for formatting text # elapsed (datetime): The offset into the session when this job is created # class _StatusJob(): - def __init__(self, context, element, action_name, content_profile, format_profile, elapsed): + def __init__(self, context, job, content_profile, format_profile, elapsed): + action_name = job.action_name + if not isinstance(job, ElementJob): + element = None + else: + element = job.element # # Public members @@ -374,6 +385,7 @@ class _StatusJob(): self.element = element # The Element self.action_name = action_name # The action name self.size = None # The number of characters required to render + self.full_name = element._get_full_name() if element else action_name # # Private members @@ -386,7 +398,7 @@ class _StatusJob(): # Calculate the size needed to display self.size = 10 # Size of time code with brackets self.size += len(action_name) - self.size += len(element._get_full_name()) + self.size += len(self.full_name) self.size += 3 # '[' + ':' + ']' # render() @@ -403,7 +415,7 @@ class _StatusJob(): self._format_profile.fmt(']') # Add padding after the display name, before terminating ']' - name = self.element._get_full_name() + (' ' * padding) + name = self.full_name + (' ' * padding) text += self._format_profile.fmt('[') + \ self._content_profile.fmt(self.action_name) + \ self._format_profile.fmt(':') + \ diff --git a/buildstream/_scheduler/__init__.py b/buildstream/_scheduler/__init__.py index a53a133c2..b6e3eeb94 100644 --- a/buildstream/_scheduler/__init__.py +++ b/buildstream/_scheduler/__init__.py @@ -17,7 +17,7 @@ # Authors: # Tristan Van Berkom <tristan.vanberkom@codethink.co.uk> -from .queues import Queue, QueueStatus, QueueType +from .queues import Queue, QueueStatus from .queues.fetchqueue import FetchQueue from .queues.trackqueue import TrackQueue @@ -26,3 +26,4 @@ from .queues.pushqueue import PushQueue from .queues.pullqueue import PullQueue from .scheduler import Scheduler, SchedStatus +from .jobs import ElementJob diff --git a/buildstream/_scheduler/jobs/__init__.py b/buildstream/_scheduler/jobs/__init__.py new file mode 100644 index 000000000..0030f5c97 --- /dev/null +++ b/buildstream/_scheduler/jobs/__init__.py @@ -0,0 +1 @@ +from .elementjob import ElementJob diff --git a/buildstream/_scheduler/jobs/elementjob.py b/buildstream/_scheduler/jobs/elementjob.py new file mode 100644 index 000000000..e49dfb12f --- /dev/null +++ b/buildstream/_scheduler/jobs/elementjob.py @@ -0,0 +1,163 @@ +# Copyright (C) 2018 Codethink Limited +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. +# +# Author: +# Tristan Daniël Maat <tristan.maat@codethink.co.uk> +# +import os +from contextlib import contextmanager + +from ruamel import yaml + +from ..._message import Message, MessageType +from ...plugin import _plugin_lookup +from ... import _signals + +from .job import Job + + +# ElementJob() +# +# A job to run an element's commands. When this job is spawned +# `action_cb` will be called, and when it completes `complete_cb` will +# be called. +# +# Args: +# scheduler (Scheduler): The scheduler +# action_name (str): The queue action name +# max_retries (int): The maximum number of retries +# action_cb (callable): The function to execute on the child +# complete_cb (callable): The function to execute when the job completes +# element (Element): The element to work on +# kwargs: Remaining Job() constructor arguments +# +# Here is the calling signature of the action_cb: +# +# action_cb(): +# +# This function will be called in the child task +# +# Args: +# element (Element): The element passed to the Job() constructor +# +# Returns: +# (object): Any abstract simple python object, including a string, int, +# bool, list or dict, this must be a simple serializable object. +# +# Here is the calling signature of the complete_cb: +# +# complete_cb(): +# +# This function will be called when the child task completes +# +# Args: +# job (Job): The job object which completed +# element (Element): The element passed to the Job() constructor +# success (bool): True if the action_cb did not raise an exception +# result (object): The deserialized object returned by the `action_cb`, or None +# if `success` is False +# +class ElementJob(Job): + def __init__(self, *args, element, queue, action_cb, complete_cb, **kwargs): + super().__init__(*args, **kwargs) + self.queue = queue + self._element = element + self._action_cb = action_cb # The action callable function + self._complete_cb = complete_cb # The complete callable function + + @property + def element(self): + return self._element + + def child_process(self): + return self._action_cb(self._element) + + def parent_complete(self, success, result): + self._complete_cb(self, self._element, success, self._result) + + @contextmanager + def child_logging_enabled(self, logfile): + self._logfile = logfile.format(pid=os.getpid()) + + with open(self._logfile, 'a') as log: + # Write one last line to the log and flush it to disk + def flush_log(): + + # If the process currently had something happening in the I/O stack + # then trying to reenter the I/O stack will fire a runtime error. + # + # So just try to flush as well as we can at SIGTERM time + try: + # FIXME: Better logging + + log.write('\n\nAction {} for element {} forcefully terminated\n' + .format(self.action_name, self._element.name)) + log.flush() + except RuntimeError: + os.fsync(log.fileno()) + + self._element._set_log_handle(log) + with _signals.terminator(flush_log): + self._print_start_message(self._element, self._logfile) + yield self._logfile + self._element._set_log_handle(None) + self._logfile = None + + def message(self, message_type, message, **kwargs): + args = dict(kwargs) + args['scheduler'] = True + self._scheduler.context.message( + Message(self._element._get_unique_id(), + message_type, + message, + **args)) + + def _print_start_message(self, element, logfile): + self.message(MessageType.START, self.action_name, logfile=logfile) + + # Print the element's environment at the beginning of any element's log file. + # + # This should probably be omitted for non-build tasks but it's harmless here + elt_env = element.get_environment() + env_dump = yaml.round_trip_dump(elt_env, default_flow_style=False, allow_unicode=True) + self.message(MessageType.LOG, + "Build environment for element {}".format(element.name), + detail=env_dump, logfile=logfile) + + def child_log(self, message): + # Tag them on the way out the door... + message.action_name = self.action_name + message.task_id = self._element._get_unique_id() + + # Use the plugin for the task for the output, not a plugin + # which might be acting on behalf of the task + plugin = _plugin_lookup(message.task_id) + + with plugin._output_file() as output: + message_text = self.decorate_message(message, '[{}]'.format(plugin.name)) + output.write('{}\n'.format(message_text)) + output.flush() + + return message + + def child_process_data(self): + data = {} + + workspace = self._element._get_workspace() + + if workspace is not None: + data['workspace'] = workspace.to_dict() + + return data diff --git a/buildstream/_scheduler/job.py b/buildstream/_scheduler/jobs/job.py index cc350649e..dec3a0c11 100644 --- a/buildstream/_scheduler/job.py +++ b/buildstream/_scheduler/jobs/job.py @@ -1,5 +1,5 @@ # -# Copyright (C) 2016 Codethink Limited +# Copyright (C) 2018 Codethink Limited # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -17,6 +17,7 @@ # Authors: # Tristan Van Berkom <tristan.vanberkom@codethink.co.uk> # Jürg Billeter <juerg.billeter@codethink.co.uk> +# Tristan Maat <tristan.maat@codethink.co.uk> # System imports import os @@ -26,20 +27,21 @@ import datetime import traceback import asyncio import multiprocessing -from ruamel import yaml +from contextlib import contextmanager + +import psutil # BuildStream toplevel imports -from .._exceptions import BstError, set_last_task_error -from .._message import Message, MessageType, unconditional_messages -from ..plugin import _plugin_lookup -from .. import _signals, utils +from ..._exceptions import ImplError, BstError, set_last_task_error +from ..._message import MessageType, unconditional_messages +from ... import _signals, utils # Used to distinguish between status messages and return values class Envelope(): def __init__(self, message_type, message): - self.message_type = message_type - self.message = message + self._message_type = message_type + self._message = message # Process class that doesn't call waitpid on its own. @@ -54,54 +56,49 @@ class Process(multiprocessing.Process): # Job() # # The Job object represents a parallel task, when calling Job.spawn(), -# the given `action_cb` will be called in parallel to the calling process, -# and `complete_cb` will be called with the action result in the calling -# process when the job completes. +# the given `Job.child_process()` will be called in parallel to the +# calling process, and `Job.parent_complete()` will be called with the +# action result in the calling process when the job completes. # # Args: # scheduler (Scheduler): The scheduler -# element (Element): The element to operate on # action_name (str): The queue action name -# action_cb (callable): The action function -# complete_cb (callable): The function to call when complete +# logfile (str): A template string that points to the logfile +# that should be used - should contain {pid}. +# resources (iter(ResourceType)) - A set of resources this job +# wants to use. +# exclusive_resources (iter(ResourceType)) - A set of resources +# this job wants to use +# exclusively. # max_retries (int): The maximum number of retries # -# Here is the calling signature of the action_cb: -# -# action_cb(): -# -# This function will be called in the child task -# -# Args: -# element (Element): The element passed to the Job() constructor -# -# Returns: -# (object): Any abstract simple python object, including a string, int, -# bool, list or dict, this must be a simple serializable object. -# -# Here is the calling signature of the complete_cb: -# -# complete_cb(): -# -# This function will be called when the child task completes -# -# Args: -# job (Job): The job object which completed -# element (Element): The element passed to the Job() constructor -# success (bool): True if the action_cb did not raise an exception -# result (object): The deserialized object returned by the `action_cb`, or None -# if `success` is False -# class Job(): - def __init__(self, scheduler, element, action_name, action_cb, complete_cb, *, max_retries=0): + def __init__(self, scheduler, action_name, logfile, *, + resources=None, exclusive_resources=None, max_retries=0): + + if resources is None: + resources = set() + else: + resources = set(resources) + if exclusive_resources is None: + exclusive_resources = set() + else: + exclusive_resources = set(resources) + + assert exclusive_resources <= resources, "All exclusive resources must also be resources!" # # Public members # - self.element = element # The element we're processing self.action_name = action_name # The action name for the Queue - self.workspace_dict = None # A serialized Workspace object, after any modifications + self.child_data = None # Data to be sent to the main process + + # The resources this job wants to access + self.resources = resources + # Resources this job needs to access exclusively, i.e., no + # other job should be allowed to access them + self.exclusive_resources = exclusive_resources # # Private members @@ -110,13 +107,12 @@ class Job(): self._queue = multiprocessing.Queue() # A message passing queue self._process = None # The Process object self._watcher = None # Child process watcher - self._action_cb = action_cb # The action callable function - self._complete_cb = complete_cb # The complete callable function self._listening = False # Whether the parent is currently listening self._suspended = False # Whether this job is currently suspended self._max_retries = max_retries # Maximum number of automatic retries self._result = None # Return value of child action in the parent self._tries = 0 # Try count, for retryable jobs + self._logfile = logfile # spawn() # @@ -173,8 +169,7 @@ class Job(): # First resume the job if it's suspended self.resume(silent=True) - self._message(self.element, MessageType.STATUS, - "{} terminating".format(self.action_name)) + self.message(MessageType.STATUS, "{} terminating".format(self.action_name)) # Make sure there is no garbage on the queue self._parent_stop_listening() @@ -205,9 +200,15 @@ class Job(): def kill(self): # Force kill - self._message(self.element, MessageType.WARN, - "{} did not terminate gracefully, killing".format(self.action_name)) - utils._kill_process_tree(self._process.pid) + self.message(MessageType.WARN, + "{} did not terminate gracefully, killing".format(self.action_name)) + + try: + utils._kill_process_tree(self._process.pid) + # This can happen if the process died of its own accord before + # we try to kill it + except psutil.NoSuchProcess: + return # suspend() # @@ -215,8 +216,8 @@ class Job(): # def suspend(self): if not self._suspended: - self._message(self.element, MessageType.STATUS, - "{} suspending".format(self.action_name)) + self.message(MessageType.STATUS, + "{} suspending".format(self.action_name)) try: # Use SIGTSTP so that child processes may handle and propagate @@ -240,13 +241,107 @@ class Job(): def resume(self, silent=False): if self._suspended: if not silent: - self._message(self.element, MessageType.STATUS, - "{} resuming".format(self.action_name)) + self.message(MessageType.STATUS, + "{} resuming".format(self.action_name)) os.kill(self._process.pid, signal.SIGCONT) self._suspended = False ####################################################### + # Abstract Methods # + ####################################################### + + # parent_complete() + # + # This will be executed after the job finishes, and is expected to + # pass the result to the main thread. + # + # Args: + # success (bool): Whether the job was successful. + # result (any): The result returned by child_process(). + # + def parent_complete(self, success, result): + raise ImplError("Job '{kind}' does not implement parent_complete()" + .format(kind=type(self).__name__)) + + # child_process() + # + # This will be executed after fork(), and is intended to perform + # the job's task. + # + # Returns: + # (any): A (simple!) object to be returned to the main thread + # as the result. + # + def child_process(self): + raise ImplError("Job '{kind}' does not implement child_process()" + .format(kind=type(self).__name__)) + + # child_logging_enabled() + # + # Start the log for this job. This function will be given a + # template string for the path to a log file - this will contain + # "{pid}", which should be replaced with the current process' + # PID. (i.e., call something like `logfile.format(pid=os.getpid())`). + # + # Args: + # logfile (str): A template string that points to the logfile + # that should be used - replace {pid} first. + # + # Yields: + # (str) The path to the logfile with {pid} replaced. + # + @contextmanager + def child_logging_enabled(self, logfile): + raise ImplError("Job '{kind}' does not implement child_logging_enabled()" + .format(kind=type(self).__name__)) + + # message(): + # + # Sends a message to the frontend + # + # Args: + # message_type (MessageType): The type of message to send + # message (str): The message + # kwargs: Remaining Message() constructor arguments + # + def message(self, message_type, message, **kwargs): + raise ImplError("Job '{kind}' does not implement message()" + .format(kind=type(self).__name__)) + + # child_process_data() + # + # Abstract method to retrieve additional data that should be + # returned to the parent process. Note that the job result is + # retrieved independently. + # + # Values can later be retrieved in Job.child_data. + # + # Returns: + # (dict) A dict containing values to be reported to the main process + # + def child_process_data(self): + return {} + + # child_log() + # + # Log a message returned by the frontend's main message handler + # and return it to the main process. + # + # This method is also expected to add process-specific information + # to the message (notably, action_name and task_id). + # + # Arguments: + # message (str): The message to log + # + # Returns: + # message (Message): A message object + # + def child_log(self, message): + raise ImplError("Job '{kind}' does not implement child_log()" + .format(kind=type(self).__name__)) + + ####################################################### # Local Private Methods # ####################################################### # @@ -258,24 +353,41 @@ class Job(): # ####################################################### - # _message(): + # decorate_message() # - # Sends a message to the frontend + # Format a message so that it can be used nicely for logging + # purposes. This will prepend a time code and add other + # information to help determine what happened. # # Args: - # plugin (Plugin): The plugin to send a message for - # message_type (MessageType): The type of message to send - # message (str): The message - # kwargs: Remaining Message() constructor arguments + # message (Message) - The message to create a text from. + # name (str) - A name for the executing context. # - def _message(self, plugin, message_type, message, **kwargs): - args = dict(kwargs) - args['scheduler'] = True - self._scheduler.context.message( - Message(plugin._get_unique_id(), - message_type, - message, - **args)) + # Returns: + # (str) The text to log. + # + def decorate_message(self, message, name): + INDENT = " " + EMPTYTIME = "--:--:--" + template = "[{timecode: <8}] {type: <7} {name: <15}: {message}" + + detail = '' + if message.detail is not None: + template += "\n\n{detail}" + detail = message.detail.rstrip('\n') + detail = INDENT + INDENT.join(detail.splitlines(True)) + + timecode = EMPTYTIME + if message.message_type in (MessageType.SUCCESS, MessageType.FAIL): + hours, remainder = divmod(int(message.elapsed.total_seconds()), 60**2) + minutes, seconds = divmod(remainder, 60) + timecode = "{0:02d}:{1:02d}:{2:02d}".format(hours, minutes, seconds) + + return template.format(timecode=timecode, + type=message.message_type.upper(), + name=name, + message=message.message, + detail=detail) # _child_action() # @@ -286,7 +398,7 @@ class Job(): # def _child_action(self, queue): - element = self.element + logfile = self._logfile # This avoids some SIGTSTP signals from grandchildren # getting propagated up to the master process @@ -322,35 +434,24 @@ class Job(): # Time, log and and run the action function # with _signals.suspendable(stop_time, resume_time), \ - element._logging_enabled(self.action_name) as filename: - - self._message(element, MessageType.START, self.action_name, logfile=filename) - - # Print the element's environment at the beginning of any element's log file. - # - # This should probably be omitted for non-build tasks but it's harmless here - elt_env = element.get_environment() - env_dump = yaml.round_trip_dump(elt_env, default_flow_style=False, allow_unicode=True) - self._message(element, MessageType.LOG, - "Build environment for element {}".format(element.name), - detail=env_dump, logfile=filename) + self.child_logging_enabled(logfile) as filename: try: # Try the task action - result = self._action_cb(element) + result = self.child_process() except BstError as e: elapsed = datetime.datetime.now() - starttime if self._tries <= self._max_retries: - self._message(element, MessageType.FAIL, "Try #{} failed, retrying".format(self._tries), - elapsed=elapsed) + self.message(MessageType.FAIL, + "Try #{} failed, retrying".format(self._tries), + elapsed=elapsed) else: - self._message(element, MessageType.FAIL, str(e), - elapsed=elapsed, detail=e.detail, - logfile=filename, sandbox=e.sandbox) + self.message(MessageType.FAIL, str(e), + elapsed=elapsed, detail=e.detail, + logfile=filename, sandbox=e.sandbox) - # Report changes in the workspace, even if there was a handled failure - self._child_send_workspace() + self._queue.put(Envelope('child_data', self.child_process_data())) # Report the exception to the parent (for internal testing purposes) self._child_send_error(e) @@ -364,19 +465,20 @@ class Job(): # elapsed = datetime.datetime.now() - starttime detail = "An unhandled exception occured:\n\n{}".format(traceback.format_exc()) - self._message(element, MessageType.BUG, self.action_name, - elapsed=elapsed, detail=detail, - logfile=filename) + + self.message(MessageType.BUG, self.action_name, + elapsed=elapsed, detail=detail, + logfile=filename) self._child_shutdown(1) else: # No exception occurred in the action - self._child_send_workspace() + self._queue.put(Envelope('child_data', self.child_process_data())) self._child_send_result(result) elapsed = datetime.datetime.now() - starttime - self._message(element, MessageType.SUCCESS, self.action_name, elapsed=elapsed, - logfile=filename) + self.message(MessageType.SUCCESS, self.action_name, elapsed=elapsed, + logfile=filename) # Shutdown needs to stay outside of the above context manager, # make sure we dont try to handle SIGTERM while the process @@ -419,16 +521,6 @@ class Job(): envelope = Envelope('result', result) self._queue.put(envelope) - # _child_send_workspace() - # - # Sends the serialized workspace through the message queue, if any - # - def _child_send_workspace(self): - workspace = self.element._get_workspace() - if workspace: - envelope = Envelope('workspace', workspace.to_dict()) - self._queue.put(envelope) - # _child_shutdown() # # Shuts down the child process by cleaning up and exiting the process @@ -440,44 +532,6 @@ class Job(): self._queue.close() sys.exit(exit_code) - # _child_log() - # - # Logs a Message to the process's dedicated log file - # - # Args: - # plugin (Plugin): The plugin to log for - # message (Message): The message to log - # - def _child_log(self, plugin, message): - - with plugin._output_file() as output: - INDENT = " " - EMPTYTIME = "--:--:--" - - name = '[' + plugin.name + ']' - - fmt = "[{timecode: <8}] {type: <7} {name: <15}: {message}" - detail = '' - if message.detail is not None: - fmt += "\n\n{detail}" - detail = message.detail.rstrip('\n') - detail = INDENT + INDENT.join(detail.splitlines(True)) - - timecode = EMPTYTIME - if message.message_type in (MessageType.SUCCESS, MessageType.FAIL): - hours, remainder = divmod(int(message.elapsed.total_seconds()), 60 * 60) - minutes, seconds = divmod(remainder, 60) - timecode = "{0:02d}:{1:02d}:{2:02d}".format(hours, minutes, seconds) - - message_text = fmt.format(timecode=timecode, - type=message.message_type.upper(), - name=name, - message=message.message, - detail=detail) - - output.write('{}\n'.format(message_text)) - output.flush() - # _child_message_handler() # # A Context delegate for handling messages, this replaces the @@ -491,16 +545,8 @@ class Job(): # def _child_message_handler(self, message, context): - # Tag them on the way out the door... - message.action_name = self.action_name - message.task_id = self.element._get_unique_id() - - # Use the plugin for the task for the output, not a plugin - # which might be acting on behalf of the task - plugin = _plugin_lookup(message.task_id) - # Log first - self._child_log(plugin, message) + message = self.child_log(message) if message.message_type == MessageType.FAIL and self._tries <= self._max_retries: # Job will be retried, display failures as warnings in the frontend @@ -540,7 +586,8 @@ class Job(): self.spawn() return - self._complete_cb(self, self.element, returncode == 0, self._result) + self.parent_complete(returncode == 0, self._result) + self._scheduler.job_completed(self, returncode == 0) # _parent_process_envelope() # @@ -557,21 +604,22 @@ class Job(): if not self._listening: return - if envelope.message_type == 'message': + if envelope._message_type == 'message': # Propagate received messages from children # back through the context. - self._scheduler.context.message(envelope.message) - elif envelope.message_type == 'error': + self._scheduler.context.message(envelope._message) + elif envelope._message_type == 'error': # For regression tests only, save the last error domain / reason # reported from a child task in the main process, this global state # is currently managed in _exceptions.py - set_last_task_error(envelope.message['domain'], - envelope.message['reason']) - elif envelope.message_type == 'result': + set_last_task_error(envelope._message['domain'], + envelope._message['reason']) + elif envelope._message_type == 'result': assert self._result is None - self._result = envelope.message - elif envelope.message_type == 'workspace': - self.workspace_dict = envelope.message + self._result = envelope._message + elif envelope._message_type == 'child_data': + # If we retry a job, we assign a new value to this + self.child_data = envelope._message else: raise Exception() diff --git a/buildstream/_scheduler/queues/__init__.py b/buildstream/_scheduler/queues/__init__.py index b9acef18c..3b2293919 100644 --- a/buildstream/_scheduler/queues/__init__.py +++ b/buildstream/_scheduler/queues/__init__.py @@ -1 +1 @@ -from .queue import Queue, QueueStatus, QueueType +from .queue import Queue, QueueStatus diff --git a/buildstream/_scheduler/queues/buildqueue.py b/buildstream/_scheduler/queues/buildqueue.py index 50ba312ff..7f8ac9e8f 100644 --- a/buildstream/_scheduler/queues/buildqueue.py +++ b/buildstream/_scheduler/queues/buildqueue.py @@ -18,7 +18,8 @@ # Tristan Van Berkom <tristan.vanberkom@codethink.co.uk> # Jürg Billeter <juerg.billeter@codethink.co.uk> -from . import Queue, QueueStatus, QueueType +from . import Queue, QueueStatus +from ..resources import ResourceType # A queue which assembles elements @@ -27,7 +28,7 @@ class BuildQueue(Queue): action_name = "Build" complete_name = "Built" - queue_type = QueueType.BUILD + resources = [ResourceType.PROCESS] def process(self, element): element._assemble() @@ -50,7 +51,7 @@ class BuildQueue(Queue): return QueueStatus.READY - def done(self, element, result, success): + def done(self, job, element, result, success): if success: # Inform element in main process that assembly is done diff --git a/buildstream/_scheduler/queues/fetchqueue.py b/buildstream/_scheduler/queues/fetchqueue.py index bdff15667..265890b7a 100644 --- a/buildstream/_scheduler/queues/fetchqueue.py +++ b/buildstream/_scheduler/queues/fetchqueue.py @@ -22,7 +22,8 @@ from ... import Consistency # Local imports -from . import Queue, QueueStatus, QueueType +from . import Queue, QueueStatus +from ..resources import ResourceType # A queue which fetches element sources @@ -31,7 +32,7 @@ class FetchQueue(Queue): action_name = "Fetch" complete_name = "Fetched" - queue_type = QueueType.FETCH + resources = [ResourceType.DOWNLOAD] def __init__(self, scheduler, skip_cached=False): super().__init__(scheduler) @@ -66,7 +67,7 @@ class FetchQueue(Queue): return QueueStatus.READY - def done(self, element, result, success): + def done(self, _, element, result, success): if not success: return False diff --git a/buildstream/_scheduler/queues/pullqueue.py b/buildstream/_scheduler/queues/pullqueue.py index b4f5b0d73..efaa59ef3 100644 --- a/buildstream/_scheduler/queues/pullqueue.py +++ b/buildstream/_scheduler/queues/pullqueue.py @@ -19,7 +19,8 @@ # Jürg Billeter <juerg.billeter@codethink.co.uk> # Local imports -from . import Queue, QueueStatus, QueueType +from . import Queue, QueueStatus +from ..resources import ResourceType # A queue which pulls element artifacts @@ -28,7 +29,7 @@ class PullQueue(Queue): action_name = "Pull" complete_name = "Pulled" - queue_type = QueueType.FETCH + resources = [ResourceType.UPLOAD] def process(self, element): # returns whether an artifact was downloaded or not @@ -51,7 +52,7 @@ class PullQueue(Queue): else: return QueueStatus.SKIP - def done(self, element, result, success): + def done(self, _, element, result, success): if not success: return False diff --git a/buildstream/_scheduler/queues/pushqueue.py b/buildstream/_scheduler/queues/pushqueue.py index 624eefd1d..568e053d6 100644 --- a/buildstream/_scheduler/queues/pushqueue.py +++ b/buildstream/_scheduler/queues/pushqueue.py @@ -19,7 +19,8 @@ # Jürg Billeter <juerg.billeter@codethink.co.uk> # Local imports -from . import Queue, QueueStatus, QueueType +from . import Queue, QueueStatus +from ..resources import ResourceType # A queue which pushes element artifacts @@ -28,7 +29,7 @@ class PushQueue(Queue): action_name = "Push" complete_name = "Pushed" - queue_type = QueueType.PUSH + resources = [ResourceType.UPLOAD] def process(self, element): # returns whether an artifact was uploaded or not @@ -40,7 +41,7 @@ class PushQueue(Queue): return QueueStatus.READY - def done(self, element, result, success): + def done(self, _, element, result, success): if not success: return False diff --git a/buildstream/_scheduler/queues/queue.py b/buildstream/_scheduler/queues/queue.py index d0c482802..8ca3ac063 100644 --- a/buildstream/_scheduler/queues/queue.py +++ b/buildstream/_scheduler/queues/queue.py @@ -19,32 +19,20 @@ # Jürg Billeter <juerg.billeter@codethink.co.uk> # System imports +import os from collections import deque from enum import Enum import traceback # Local imports -from ..job import Job +from ..jobs import ElementJob +from ..resources import ResourceType # BuildStream toplevel imports from ..._exceptions import BstError, set_last_task_error from ..._message import Message, MessageType -# Indicates the kind of activity -# -# -class QueueType(): - # Tasks which download stuff from the internet - FETCH = 1 - - # CPU/Disk intensive tasks - BUILD = 2 - - # Tasks which upload stuff to the internet - PUSH = 3 - - # Queue status for a given element # # @@ -69,14 +57,13 @@ class Queue(): # These should be overridden on class data of of concrete Queue implementations action_name = None complete_name = None - queue_type = None + resources = [] # Resources this queues' jobs want def __init__(self, scheduler): # # Public members # - self.active_jobs = [] # List of active ongoing Jobs, for scheduler observation self.failed_elements = [] # List of failed elements, for the frontend self.processed_elements = [] # List of processed elements, for the frontend self.skipped_elements = [] # List of skipped elements, for the frontend @@ -88,13 +75,13 @@ class Queue(): self._wait_queue = deque() self._done_queue = deque() self._max_retries = 0 - if self.queue_type == QueueType.FETCH or self.queue_type == QueueType.PUSH: - self._max_retries = scheduler.context.sched_network_retries # Assert the subclass has setup class data assert self.action_name is not None assert self.complete_name is not None - assert self.queue_type is not None + + if ResourceType.UPLOAD in self.resources or ResourceType.DOWNLOAD in self.resources: + self._max_retries = scheduler.context.sched_network_retries ##################################################### # Abstract Methods for Queue implementations # @@ -143,6 +130,7 @@ class Queue(): # Abstract method for handling a successful job completion. # # Args: + # job (Job): The job which completed processing # element (Element): The element which completed processing # result (any): The return value of the process() implementation # success (bool): True if the process() implementation did not @@ -152,7 +140,7 @@ class Queue(): # (bool): True if the element should appear to be processsed, # Otherwise False will count the element as "skipped" # - def done(self, element, result, success): + def done(self, job, element, result, success): pass ##################################################### @@ -170,10 +158,22 @@ class Queue(): if not elts: return + # Note: The internal lists work with jobs. This is not + # reflected in any external methods (except + # pop/peek_ready_jobs). + def create_job(element): + logfile = self._element_log_path(element) + return ElementJob(self._scheduler, self.action_name, + logfile, element=element, queue=self, + resources=self.resources, + action_cb=self.process, + complete_cb=self._job_done, + max_retries=self._max_retries) + # Place skipped elements directly on the done queue - elts = list(elts) - skip = [elt for elt in elts if self.status(elt) == QueueStatus.SKIP] - wait = [elt for elt in elts if elt not in skip] + jobs = [create_job(elt) for elt in elts] + skip = [job for job in jobs if self.status(job.element) == QueueStatus.SKIP] + wait = [job for job in jobs if job not in skip] self._wait_queue.extend(wait) self._done_queue.extend(skip) @@ -189,7 +189,7 @@ class Queue(): # def dequeue(self): while self._done_queue: - yield self._done_queue.popleft() + yield self._done_queue.popleft().element # dequeue_ready() # @@ -201,7 +201,10 @@ class Queue(): def dequeue_ready(self): return any(self._done_queue) - # process_ready() + # pop_ready_jobs() + # + # Returns: + # ([Job]): A list of jobs to run # # Process elements in the queue, moving elements which were enqueued # into the dequeue pool, and processing them if necessary. @@ -211,46 +214,45 @@ class Queue(): # # o Elements which are QueueStatus.WAIT will not be effected # - # o Elements which are QueueStatus.READY will be processed - # and added to the Queue.active_jobs list as a result, - # given that the scheduler allows the Queue enough tokens - # for the given queue's job type - # # o Elements which are QueueStatus.SKIP will move directly # to the dequeue pool # - def process_ready(self): - scheduler = self._scheduler + # o For Elements which are QueueStatus.READY a Job will be + # created and returned to the caller, given that the scheduler + # allows the Queue enough resources for the given job + # + def pop_ready_jobs(self): unready = [] + ready = [] - while self._wait_queue and scheduler.get_job_token(self.queue_type): - element = self._wait_queue.popleft() + while self._wait_queue: + job = self._wait_queue.popleft() + element = job.element status = self.status(element) if status == QueueStatus.WAIT: - scheduler.put_job_token(self.queue_type) - unready.append(element) + unready.append(job) continue elif status == QueueStatus.SKIP: - scheduler.put_job_token(self.queue_type) - self._done_queue.append(element) + self._done_queue.append(job) self.skipped_elements.append(element) continue self.prepare(element) - - job = Job(scheduler, element, self.action_name, - self.process, self._job_done, - max_retries=self._max_retries) - scheduler.job_starting(job) - - job.spawn() - self.active_jobs.append(job) + ready.append(job) # These were not ready but were in the beginning, give em # first priority again next time around self._wait_queue.extendleft(unready) + return ready + + def peek_ready_jobs(self): + def ready(job): + return self.status(job.element) == QueueStatus.READY + + yield from (job for job in self._wait_queue if ready(job)) + ##################################################### # Private Methods # ##################################################### @@ -265,12 +267,16 @@ class Queue(): # job (Job): The job which completed # def _update_workspaces(self, element, job): + workspace_dict = None + if job.child_data: + workspace_dict = job.child_data.get('workspace', None) + # Handle any workspace modifications now # - if job.workspace_dict: + if workspace_dict: context = element._get_context() workspaces = context.get_workspaces() - if workspaces.update_workspace(element._get_full_name(), job.workspace_dict): + if workspaces.update_workspace(element._get_full_name(), workspace_dict): try: workspaces.save_config() except BstError as e: @@ -291,17 +297,15 @@ class Queue(): # def _job_done(self, job, element, success, result): - # Remove from our jobs - self.active_jobs.remove(job) - - # Update workspaces in the main task before calling any queue implementation + # Update values that need to be synchronized in the main task + # before calling any queue implementation self._update_workspaces(element, job) # Give the result of the job to the Queue implementor, # and determine if it should be considered as processed # or skipped. try: - processed = self.done(element, result, success) + processed = self.done(job, element, result, success) except BstError as e: @@ -330,7 +334,7 @@ class Queue(): # No exception occured, handle the success/failure state in the normal way # if success: - self._done_queue.append(element) + self._done_queue.append(job) if processed: self.processed_elements.append(element) else: @@ -338,18 +342,22 @@ class Queue(): else: self.failed_elements.append(element) - # Give the token for this job back to the scheduler - # immediately before invoking another round of scheduling - self._scheduler.put_job_token(self.queue_type) - - # Notify frontend - self._scheduler.job_completed(self, job, success) - - self._scheduler.sched() - # Convenience wrapper for Queue implementations to send # a message for the element they are processing def _message(self, element, message_type, brief, **kwargs): context = element._get_context() message = Message(element._get_unique_id(), message_type, brief, **kwargs) context.message(message) + + def _element_log_path(self, element): + project = element._get_project() + context = element._get_context() + + key = element._get_display_key()[1] + action = self.action_name.lower() + logfile = "{key}-{action}.{{pid}}.log".format(key=key, action=action) + + directory = os.path.join(context.logdir, project.name, element.normal_name) + + os.makedirs(directory, exist_ok=True) + return os.path.join(directory, logfile) diff --git a/buildstream/_scheduler/queues/trackqueue.py b/buildstream/_scheduler/queues/trackqueue.py index 3a65f01d0..c7a8f4cc7 100644 --- a/buildstream/_scheduler/queues/trackqueue.py +++ b/buildstream/_scheduler/queues/trackqueue.py @@ -23,7 +23,8 @@ from ...plugin import _plugin_lookup from ... import SourceError # Local imports -from . import Queue, QueueStatus, QueueType +from . import Queue, QueueStatus +from ..resources import ResourceType # A queue which tracks sources @@ -32,7 +33,7 @@ class TrackQueue(Queue): action_name = "Track" complete_name = "Tracked" - queue_type = QueueType.FETCH + resources = [ResourceType.DOWNLOAD] def process(self, element): return element._track() @@ -47,7 +48,7 @@ class TrackQueue(Queue): return QueueStatus.READY - def done(self, element, result, success): + def done(self, _, element, result, success): if not success: return False diff --git a/buildstream/_scheduler/resources.py b/buildstream/_scheduler/resources.py new file mode 100644 index 000000000..bbf851b06 --- /dev/null +++ b/buildstream/_scheduler/resources.py @@ -0,0 +1,105 @@ +class ResourceType(): + CACHE = 0 + DOWNLOAD = 1 + PROCESS = 2 + UPLOAD = 3 + + +class Resources(): + def __init__(self, num_builders, num_fetchers, num_pushers): + self._max_resources = { + ResourceType.CACHE: 1, + ResourceType.DOWNLOAD: num_fetchers, + ResourceType.PROCESS: num_builders, + ResourceType.UPLOAD: num_pushers + } + + # Resources jobs are currently using. + self._used_resources = { + ResourceType.CACHE: 0, + ResourceType.DOWNLOAD: 0, + ResourceType.PROCESS: 0, + ResourceType.UPLOAD: 0 + } + + # Resources jobs currently want exclusive access to. The set + # of jobs that have asked for exclusive access is the value - + # this is so that we can avoid scheduling any other jobs until + # *all* exclusive jobs that "register interest" have finished + # - which avoids starving them of scheduling time. + self._exclusive_resources = { + ResourceType.CACHE: set(), + ResourceType.DOWNLOAD: set(), + ResourceType.PROCESS: set(), + ResourceType.UPLOAD: set() + } + + def clear_job_resources(self, job): + for resource in job.exclusive_resources: + self._exclusive_resources[resource].remove(hash(job)) + + for resource in job.resources: + self._used_resources[resource] -= 1 + + def reserve_exclusive_resources(self, job): + exclusive = job.exclusive_resources + + # The very first thing we do is to register any exclusive + # resources this job may want. Even if the job is not yet + # allowed to run (because another job is holding the resource + # it wants), we can still set this - it just means that any + # job *currently* using these resources has to finish first, + # and no new jobs wanting these can be launched (except other + # exclusive-access jobs). + # + for resource in exclusive: + self._exclusive_resources[resource].add(hash(job)) + + def reserve_job_resources(self, job): + # First, we check if the job wants to access a resource that + # another job wants exclusive access to. If so, it cannot be + # scheduled. + # + # Note that if *both* jobs want this exclusively, we don't + # fail yet. + # + # FIXME: I *think* we can deadlock if two jobs want disjoint + # sets of exclusive and non-exclusive resources. This + # is currently not possible, but may be worth thinking + # about. + # + for resource in job.resources - job.exclusive_resources: + # If our job wants this resource exclusively, we never + # check this, so we can get away with not (temporarily) + # removing it from the set. + if self._exclusive_resources[resource]: + return False + + # Now we check if anything is currently using any resources + # this job wants exclusively. If so, the job cannot be + # scheduled. + # + # Since jobs that use a resource exclusively are also using + # it, this means only one exclusive job can ever be scheduled + # at a time, despite being allowed to be part of the exclusive + # set. + # + for exclusive in job.exclusive_resources: + if self._used_resources[exclusive] != 0: + return False + + # Finally, we check if we have enough of each resource + # available. If we don't have enough, the job cannot be + # scheduled. + for resource in job.resources: + if (self._max_resources[resource] > 0 and + self._used_resources[resource] >= self._max_resources[resource]): + return False + + # Now we register the fact that our job is using the resources + # it asked for, and tell the scheduler that it is allowed to + # continue. + for resource in job.resources: + self._used_resources[resource] += 1 + + return True diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py index 7bfbc958e..bc182db32 100644 --- a/buildstream/_scheduler/scheduler.py +++ b/buildstream/_scheduler/scheduler.py @@ -21,12 +21,13 @@ # System imports import os import asyncio +from itertools import chain import signal import datetime from contextlib import contextmanager # Local imports -from .queues import QueueType +from .resources import Resources # A decent return code for Scheduler.run() @@ -69,6 +70,8 @@ class Scheduler(): # # Public members # + self.active_jobs = [] # Jobs currently being run in the scheduler + self.waiting_jobs = [] # Jobs waiting for resources self.queues = None # Exposed for the frontend to print summaries self.context = context # The Context object shared with Queues self.terminated = False # Whether the scheduler was asked to terminate or has terminated @@ -90,13 +93,9 @@ class Scheduler(): self._suspendtime = None self._queue_jobs = True # Whether we should continue to queue jobs - # Initialize task tokens with the number allowed by - # the user configuration - self._job_tokens = { - QueueType.FETCH: context.sched_fetchers, - QueueType.BUILD: context.sched_builders, - QueueType.PUSH: context.sched_pushers - } + self._resources = Resources(context.sched_builders, + context.sched_fetchers, + context.sched_pushers) # run() # @@ -129,7 +128,7 @@ class Scheduler(): self._connect_signals() # Run the queues - self.sched() + self._schedule_queue_jobs() self.loop.run_forever() self.loop.close() @@ -209,18 +208,74 @@ class Scheduler(): starttime = timenow return timenow - starttime - # sched() + # schedule_jobs() + # + # Args: + # jobs ([Job]): A list of jobs to schedule + # + # Schedule 'Job's for the scheduler to run. Jobs scheduled will be + # run as soon any other queueing jobs finish, provided sufficient + # resources are available for them to run + # + def schedule_jobs(self, jobs): + for job in jobs: + self.waiting_jobs.append(job) + + # job_completed(): + # + # Called when a Job completes + # + # Args: + # queue (Queue): The Queue holding a complete job + # job (Job): The completed Job + # success (bool): Whether the Job completed with a success status + # + def job_completed(self, job, success): + self._resources.clear_job_resources(job) + self.active_jobs.remove(job) + self._job_complete_callback(job, success) + self._schedule_queue_jobs() + self._sched() + + ####################################################### + # Local Private Methods # + ####################################################### + + # _sched() # # The main driving function of the scheduler, it will be called - # automatically when Scheduler.run() is called initially, and needs - # to be called whenever a job can potentially be scheduled, usually - # when a Queue completes handling of a job. + # automatically when Scheduler.run() is called initially, + # + def _sched(self): + for job in self.waiting_jobs: + self._resources.reserve_exclusive_resources(job) + + for job in self.waiting_jobs: + if not self._resources.reserve_job_resources(job): + continue + + job.spawn() + self.waiting_jobs.remove(job) + self.active_jobs.append(job) + + if self._job_start_callback: + self._job_start_callback(job) + + # If nothings ticking, time to bail out + if not self.active_jobs and not self.waiting_jobs: + self.loop.stop() + + # _schedule_queue_jobs() # - # This will process the Queues and pull elements through the Queues + # Ask the queues what jobs they want to schedule and schedule + # them. This is done here so we can ask for new jobs when jobs + # from previous queues become available. + # + # This will process the Queues, pull elements through the Queues # and process anything that is ready. # - def sched(self): - + def _schedule_queue_jobs(self): + ready = [] process_queues = True while self._queue_jobs and process_queues: @@ -233,90 +288,29 @@ class Scheduler(): # Dequeue processed elements for the next queue elements = list(queue.dequeue()) - elements = list(elements) # Kickoff whatever processes can be processed at this time # - # We start by queuing from the last queue first, because we want to - # give priority to queues later in the scheduling process in the case - # that multiple queues share the same token type. + # We start by queuing from the last queue first, because + # we want to give priority to queues later in the + # scheduling process in the case that multiple queues + # share the same token type. # - # This avoids starvation situations where we dont move on to fetch - # tasks for elements which failed to pull, and thus need all the pulls - # to complete before ever starting a build - for queue in reversed(self.queues): - queue.process_ready() - - # process_ready() may have skipped jobs, adding them to the done_queue. - # Pull these skipped elements forward to the next queue and process them. + # This avoids starvation situations where we dont move on + # to fetch tasks for elements which failed to pull, and + # thus need all the pulls to complete before ever starting + # a build + ready.extend(chain.from_iterable( + queue.pop_ready_jobs() for queue in reversed(self.queues) + )) + + # pop_ready_jobs() may have skipped jobs, adding them to + # the done_queue. Pull these skipped elements forward to + # the next queue and process them. process_queues = any(q.dequeue_ready() for q in self.queues) - # If nothings ticking, time to bail out - ticking = 0 - for queue in self.queues: - ticking += len(queue.active_jobs) - - if ticking == 0: - self.loop.stop() - - # get_job_token(): - # - # Used by the Queue object to obtain a token for - # processing a Job, if a Queue does not receive a token - # then it must wait until a later time in order to - # process pending jobs. - # - # Args: - # queue_type (QueueType): The type of token to obtain - # - # Returns: - # (bool): Whether a token was handed out or not - # - def get_job_token(self, queue_type): - if self._job_tokens[queue_type] > 0: - self._job_tokens[queue_type] -= 1 - return True - return False - - # put_job_token(): - # - # Return a job token to the scheduler. Tokens previously - # received with get_job_token() must be returned to - # the scheduler once the associated job is complete. - # - # Args: - # queue_type (QueueType): The type of token to obtain - # - def put_job_token(self, queue_type): - self._job_tokens[queue_type] += 1 - - # job_starting(): - # - # Called by the Queue when starting a Job - # - # Args: - # job (Job): The starting Job - # - def job_starting(self, job): - if self._job_start_callback: - self._job_start_callback(job.element, job.action_name) - - # job_completed(): - # - # Called by the Queue when a Job completes - # - # Args: - # queue (Queue): The Queue holding a complete job - # job (Job): The completed Job - # success (bool): Whether the Job completed with a success status - # - def job_completed(self, queue, job, success): - if self._job_complete_callback: - self._job_complete_callback(job.element, queue, job.action_name, success) - - ####################################################### - # Local Private Methods # - ####################################################### + self.schedule_jobs(ready) + self._sched() # _suspend_jobs() # @@ -326,9 +320,8 @@ class Scheduler(): if not self.suspended: self._suspendtime = datetime.datetime.now() self.suspended = True - for queue in self.queues: - for job in queue.active_jobs: - job.suspend() + for job in self.active_jobs: + job.suspend() # _resume_jobs() # @@ -336,9 +329,8 @@ class Scheduler(): # def _resume_jobs(self): if self.suspended: - for queue in self.queues: - for job in queue.active_jobs: - job.resume() + for job in self.active_jobs: + job.resume() self.suspended = False self._starttime += (datetime.datetime.now() - self._suspendtime) self._suspendtime = None @@ -401,19 +393,18 @@ class Scheduler(): wait_limit = 20.0 # First tell all jobs to terminate - for queue in self.queues: - for job in queue.active_jobs: - job.terminate() + for job in self.active_jobs: + job.terminate() # Now wait for them to really terminate - for queue in self.queues: - for job in queue.active_jobs: - elapsed = datetime.datetime.now() - wait_start - timeout = max(wait_limit - elapsed.total_seconds(), 0.0) - if not job.terminate_wait(timeout): - job.kill() - - self.loop.stop() + for job in self.active_jobs: + elapsed = datetime.datetime.now() - wait_start + timeout = max(wait_limit - elapsed.total_seconds(), 0.0) + if not job.terminate_wait(timeout): + job.kill() + + # Clear out the waiting jobs + self.waiting_jobs = [] # Regular timeout for driving status in the UI def _tick(self): |