summaryrefslogtreecommitdiff
path: root/src/buildstream/_scheduler
diff options
context:
space:
mode:
Diffstat (limited to 'src/buildstream/_scheduler')
-rw-r--r--src/buildstream/_scheduler/__init__.py30
-rw-r--r--src/buildstream/_scheduler/jobs/__init__.py23
-rw-r--r--src/buildstream/_scheduler/jobs/cachesizejob.py41
-rw-r--r--src/buildstream/_scheduler/jobs/cleanupjob.py50
-rw-r--r--src/buildstream/_scheduler/jobs/elementjob.py115
-rw-r--r--src/buildstream/_scheduler/jobs/job.py682
-rw-r--r--src/buildstream/_scheduler/queues/__init__.py1
-rw-r--r--src/buildstream/_scheduler/queues/artifactpushqueue.py44
-rw-r--r--src/buildstream/_scheduler/queues/buildqueue.py117
-rw-r--r--src/buildstream/_scheduler/queues/fetchqueue.py80
-rw-r--r--src/buildstream/_scheduler/queues/pullqueue.py66
-rw-r--r--src/buildstream/_scheduler/queues/queue.py328
-rw-r--r--src/buildstream/_scheduler/queues/sourcepushqueue.py42
-rw-r--r--src/buildstream/_scheduler/queues/trackqueue.py62
-rw-r--r--src/buildstream/_scheduler/resources.py166
-rw-r--r--src/buildstream/_scheduler/scheduler.py602
16 files changed, 2449 insertions, 0 deletions
diff --git a/src/buildstream/_scheduler/__init__.py b/src/buildstream/_scheduler/__init__.py
new file mode 100644
index 000000000..d2f458fa5
--- /dev/null
+++ b/src/buildstream/_scheduler/__init__.py
@@ -0,0 +1,30 @@
+#
+# Copyright (C) 2017 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+# Authors:
+# Tristan Van Berkom <tristan.vanberkom@codethink.co.uk>
+
+from .queues import Queue, QueueStatus
+
+from .queues.fetchqueue import FetchQueue
+from .queues.sourcepushqueue import SourcePushQueue
+from .queues.trackqueue import TrackQueue
+from .queues.buildqueue import BuildQueue
+from .queues.artifactpushqueue import ArtifactPushQueue
+from .queues.pullqueue import PullQueue
+
+from .scheduler import Scheduler, SchedStatus
+from .jobs import ElementJob, JobStatus
diff --git a/src/buildstream/_scheduler/jobs/__init__.py b/src/buildstream/_scheduler/jobs/__init__.py
new file mode 100644
index 000000000..3e213171a
--- /dev/null
+++ b/src/buildstream/_scheduler/jobs/__init__.py
@@ -0,0 +1,23 @@
+#
+# Copyright (C) 2018 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+# Authors:
+# Tristan Maat <tristan.maat@codethink.co.uk>
+
+from .elementjob import ElementJob
+from .cachesizejob import CacheSizeJob
+from .cleanupjob import CleanupJob
+from .job import JobStatus
diff --git a/src/buildstream/_scheduler/jobs/cachesizejob.py b/src/buildstream/_scheduler/jobs/cachesizejob.py
new file mode 100644
index 000000000..5f27b7fc1
--- /dev/null
+++ b/src/buildstream/_scheduler/jobs/cachesizejob.py
@@ -0,0 +1,41 @@
+# Copyright (C) 2018 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+# Author:
+# Tristan Daniël Maat <tristan.maat@codethink.co.uk>
+#
+from .job import Job, JobStatus
+
+
+class CacheSizeJob(Job):
+ def __init__(self, *args, complete_cb, **kwargs):
+ super().__init__(*args, **kwargs)
+ self._complete_cb = complete_cb
+
+ context = self._scheduler.context
+ self._casquota = context.get_casquota()
+
+ def child_process(self):
+ return self._casquota.compute_cache_size()
+
+ def parent_complete(self, status, result):
+ if status == JobStatus.OK:
+ self._casquota.set_cache_size(result)
+
+ if self._complete_cb:
+ self._complete_cb(status, result)
+
+ def child_process_data(self):
+ return {}
diff --git a/src/buildstream/_scheduler/jobs/cleanupjob.py b/src/buildstream/_scheduler/jobs/cleanupjob.py
new file mode 100644
index 000000000..4764b30b3
--- /dev/null
+++ b/src/buildstream/_scheduler/jobs/cleanupjob.py
@@ -0,0 +1,50 @@
+# Copyright (C) 2018 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+# Author:
+# Tristan Daniël Maat <tristan.maat@codethink.co.uk>
+#
+from .job import Job, JobStatus
+
+
+class CleanupJob(Job):
+ def __init__(self, *args, complete_cb, **kwargs):
+ super().__init__(*args, **kwargs)
+ self._complete_cb = complete_cb
+
+ context = self._scheduler.context
+ self._casquota = context.get_casquota()
+
+ def child_process(self):
+ def progress():
+ self.send_message('update-cache-size',
+ self._casquota.get_cache_size())
+ return self._casquota.clean(progress)
+
+ def handle_message(self, message_type, message):
+ # Update the cache size in the main process as we go,
+ # this provides better feedback in the UI.
+ if message_type == 'update-cache-size':
+ self._casquota.set_cache_size(message, write_to_disk=False)
+ return True
+
+ return False
+
+ def parent_complete(self, status, result):
+ if status == JobStatus.OK:
+ self._casquota.set_cache_size(result, write_to_disk=False)
+
+ if self._complete_cb:
+ self._complete_cb(status, result)
diff --git a/src/buildstream/_scheduler/jobs/elementjob.py b/src/buildstream/_scheduler/jobs/elementjob.py
new file mode 100644
index 000000000..fb5d38e11
--- /dev/null
+++ b/src/buildstream/_scheduler/jobs/elementjob.py
@@ -0,0 +1,115 @@
+# Copyright (C) 2018 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+# Author:
+# Tristan Daniël Maat <tristan.maat@codethink.co.uk>
+#
+from ruamel import yaml
+
+from ..._message import Message, MessageType
+
+from .job import Job
+
+
+# ElementJob()
+#
+# A job to run an element's commands. When this job is spawned
+# `action_cb` will be called, and when it completes `complete_cb` will
+# be called.
+#
+# Args:
+# scheduler (Scheduler): The scheduler
+# action_name (str): The queue action name
+# max_retries (int): The maximum number of retries
+# action_cb (callable): The function to execute on the child
+# complete_cb (callable): The function to execute when the job completes
+# element (Element): The element to work on
+# kwargs: Remaining Job() constructor arguments
+#
+# Here is the calling signature of the action_cb:
+#
+# action_cb():
+#
+# This function will be called in the child task
+#
+# Args:
+# element (Element): The element passed to the Job() constructor
+#
+# Returns:
+# (object): Any abstract simple python object, including a string, int,
+# bool, list or dict, this must be a simple serializable object.
+#
+# Here is the calling signature of the complete_cb:
+#
+# complete_cb():
+#
+# This function will be called when the child task completes
+#
+# Args:
+# job (Job): The job object which completed
+# element (Element): The element passed to the Job() constructor
+# status (JobStatus): The status of whether the workload raised an exception
+# result (object): The deserialized object returned by the `action_cb`, or None
+# if `success` is False
+#
+class ElementJob(Job):
+ def __init__(self, *args, element, queue, action_cb, complete_cb, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.queue = queue
+ self._element = element
+ self._action_cb = action_cb # The action callable function
+ self._complete_cb = complete_cb # The complete callable function
+
+ # Set the task wide ID for logging purposes
+ self.set_task_id(element._unique_id)
+
+ @property
+ def element(self):
+ return self._element
+
+ def child_process(self):
+
+ # Print the element's environment at the beginning of any element's log file.
+ #
+ # This should probably be omitted for non-build tasks but it's harmless here
+ elt_env = self._element.get_environment()
+ env_dump = yaml.round_trip_dump(elt_env, default_flow_style=False, allow_unicode=True)
+ self.message(MessageType.LOG,
+ "Build environment for element {}".format(self._element.name),
+ detail=env_dump)
+
+ # Run the action
+ return self._action_cb(self._element)
+
+ def parent_complete(self, status, result):
+ self._complete_cb(self, self._element, status, self._result)
+
+ def message(self, message_type, message, **kwargs):
+ args = dict(kwargs)
+ args['scheduler'] = True
+ self._scheduler.context.message(
+ Message(self._element._unique_id,
+ message_type,
+ message,
+ **args))
+
+ def child_process_data(self):
+ data = {}
+
+ workspace = self._element._get_workspace()
+ if workspace is not None:
+ data['workspace'] = workspace.to_dict()
+
+ return data
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
new file mode 100644
index 000000000..dd91d1634
--- /dev/null
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -0,0 +1,682 @@
+#
+# Copyright (C) 2018 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+# Authors:
+# Tristan Van Berkom <tristan.vanberkom@codethink.co.uk>
+# Jürg Billeter <juerg.billeter@codethink.co.uk>
+# Tristan Maat <tristan.maat@codethink.co.uk>
+
+# System imports
+import os
+import sys
+import signal
+import datetime
+import traceback
+import asyncio
+import multiprocessing
+
+# BuildStream toplevel imports
+from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob
+from ..._message import Message, MessageType, unconditional_messages
+from ... import _signals, utils
+
+# Return code values shutdown of job handling child processes
+#
+RC_OK = 0
+RC_FAIL = 1
+RC_PERM_FAIL = 2
+RC_SKIPPED = 3
+
+
+# JobStatus:
+#
+# The job completion status, passed back through the
+# complete callbacks.
+#
+class JobStatus():
+ # Job succeeded
+ OK = 0
+
+ # A temporary BstError was raised
+ FAIL = 1
+
+ # A SkipJob was raised
+ SKIPPED = 3
+
+
+# Used to distinguish between status messages and return values
+class _Envelope():
+ def __init__(self, message_type, message):
+ self.message_type = message_type
+ self.message = message
+
+
+# Process class that doesn't call waitpid on its own.
+# This prevents conflicts with the asyncio child watcher.
+class Process(multiprocessing.Process):
+ # pylint: disable=attribute-defined-outside-init
+ def start(self):
+ self._popen = self._Popen(self)
+ self._sentinel = self._popen.sentinel
+
+
+# Job()
+#
+# The Job object represents a parallel task, when calling Job.spawn(),
+# the given `Job.child_process()` will be called in parallel to the
+# calling process, and `Job.parent_complete()` will be called with the
+# action result in the calling process when the job completes.
+#
+# Args:
+# scheduler (Scheduler): The scheduler
+# action_name (str): The queue action name
+# logfile (str): A template string that points to the logfile
+# that should be used - should contain {pid}.
+# max_retries (int): The maximum number of retries
+#
+class Job():
+
+ def __init__(self, scheduler, action_name, logfile, *, max_retries=0):
+
+ #
+ # Public members
+ #
+ self.action_name = action_name # The action name for the Queue
+ self.child_data = None # Data to be sent to the main process
+
+ #
+ # Private members
+ #
+ self._scheduler = scheduler # The scheduler
+ self._queue = None # A message passing queue
+ self._process = None # The Process object
+ self._watcher = None # Child process watcher
+ self._listening = False # Whether the parent is currently listening
+ self._suspended = False # Whether this job is currently suspended
+ self._max_retries = max_retries # Maximum number of automatic retries
+ self._result = None # Return value of child action in the parent
+ self._tries = 0 # Try count, for retryable jobs
+ self._terminated = False # Whether this job has been explicitly terminated
+
+ self._logfile = logfile
+ self._task_id = None
+
+ # spawn()
+ #
+ # Spawns the job.
+ #
+ def spawn(self):
+
+ self._queue = multiprocessing.Queue()
+
+ self._tries += 1
+ self._parent_start_listening()
+
+ # Spawn the process
+ self._process = Process(target=self._child_action, args=[self._queue])
+
+ # Block signals which are handled in the main process such that
+ # the child process does not inherit the parent's state, but the main
+ # process will be notified of any signal after we launch the child.
+ #
+ with _signals.blocked([signal.SIGINT, signal.SIGTSTP, signal.SIGTERM], ignore=False):
+ self._process.start()
+
+ # Wait for the child task to complete.
+ #
+ # This is a tricky part of python which doesnt seem to
+ # make it to the online docs:
+ #
+ # o asyncio.get_child_watcher() will return a SafeChildWatcher() instance
+ # which is the default type of watcher, and the instance belongs to the
+ # "event loop policy" in use (so there is only one in the main process).
+ #
+ # o SafeChildWatcher() will register a SIGCHLD handler with the asyncio
+ # loop, and will selectively reap any child pids which have been
+ # terminated.
+ #
+ # o At registration time, the process will immediately be checked with
+ # `os.waitpid()` and will be reaped immediately, before add_child_handler()
+ # returns.
+ #
+ # The self._parent_child_completed callback passed here will normally
+ # be called after the child task has been reaped with `os.waitpid()`, in
+ # an event loop callback. Otherwise, if the job completes too fast, then
+ # the callback is called immediately.
+ #
+ self._watcher = asyncio.get_child_watcher()
+ self._watcher.add_child_handler(self._process.pid, self._parent_child_completed)
+
+ # terminate()
+ #
+ # Politely request that an ongoing job terminate soon.
+ #
+ # This will send a SIGTERM signal to the Job process.
+ #
+ def terminate(self):
+
+ # First resume the job if it's suspended
+ self.resume(silent=True)
+
+ self.message(MessageType.STATUS, "{} terminating".format(self.action_name))
+
+ # Make sure there is no garbage on the queue
+ self._parent_stop_listening()
+
+ # Terminate the process using multiprocessing API pathway
+ self._process.terminate()
+
+ self._terminated = True
+
+ # get_terminated()
+ #
+ # Check if a job has been terminated.
+ #
+ # Returns:
+ # (bool): True in the main process if Job.terminate() was called.
+ #
+ def get_terminated(self):
+ return self._terminated
+
+ # terminate_wait()
+ #
+ # Wait for terminated jobs to complete
+ #
+ # Args:
+ # timeout (float): Seconds to wait
+ #
+ # Returns:
+ # (bool): True if the process terminated cleanly, otherwise False
+ #
+ def terminate_wait(self, timeout):
+
+ # Join the child process after sending SIGTERM
+ self._process.join(timeout)
+ return self._process.exitcode is not None
+
+ # kill()
+ #
+ # Forcefully kill the process, and any children it might have.
+ #
+ def kill(self):
+ # Force kill
+ self.message(MessageType.WARN,
+ "{} did not terminate gracefully, killing".format(self.action_name))
+ utils._kill_process_tree(self._process.pid)
+
+ # suspend()
+ #
+ # Suspend this job.
+ #
+ def suspend(self):
+ if not self._suspended:
+ self.message(MessageType.STATUS,
+ "{} suspending".format(self.action_name))
+
+ try:
+ # Use SIGTSTP so that child processes may handle and propagate
+ # it to processes they spawn that become session leaders
+ os.kill(self._process.pid, signal.SIGTSTP)
+
+ # For some reason we receive exactly one suspend event for every
+ # SIGTSTP we send to the child fork(), even though the child forks
+ # are setsid(). We keep a count of these so we can ignore them
+ # in our event loop suspend_event()
+ self._scheduler.internal_stops += 1
+ self._suspended = True
+ except ProcessLookupError:
+ # ignore, process has already exited
+ pass
+
+ # resume()
+ #
+ # Resume this suspended job.
+ #
+ def resume(self, silent=False):
+ if self._suspended:
+ if not silent and not self._scheduler.terminated:
+ self.message(MessageType.STATUS,
+ "{} resuming".format(self.action_name))
+
+ os.kill(self._process.pid, signal.SIGCONT)
+ self._suspended = False
+
+ # set_task_id()
+ #
+ # This is called by Job subclasses to set a plugin ID
+ # associated with the task at large (if any element is related
+ # to the task).
+ #
+ # The task ID helps keep messages in the frontend coherent
+ # in the case that multiple plugins log in the context of
+ # a single task (e.g. running integration commands should appear
+ # in the frontend for the element being built, not the element
+ # running the integration commands).
+ #
+ # Args:
+ # task_id (int): The plugin identifier for this task
+ #
+ def set_task_id(self, task_id):
+ self._task_id = task_id
+
+ # send_message()
+ #
+ # To be called from inside Job.child_process() implementations
+ # to send messages to the main process during processing.
+ #
+ # These messages will be processed by the class's Job.handle_message()
+ # implementation.
+ #
+ def send_message(self, message_type, message):
+ self._queue.put(_Envelope(message_type, message))
+
+ #######################################################
+ # Abstract Methods #
+ #######################################################
+
+ # handle_message()
+ #
+ # Handle a custom message. This will be called in the main process in
+ # response to any messages sent to the main proces using the
+ # Job.send_message() API from inside a Job.child_process() implementation
+ #
+ # Args:
+ # message_type (str): A string to identify the message type
+ # message (any): A simple serializable object
+ #
+ # Returns:
+ # (bool): Should return a truthy value if message_type is handled.
+ #
+ def handle_message(self, message_type, message):
+ return False
+
+ # parent_complete()
+ #
+ # This will be executed after the job finishes, and is expected to
+ # pass the result to the main thread.
+ #
+ # Args:
+ # status (JobStatus): The job exit status
+ # result (any): The result returned by child_process().
+ #
+ def parent_complete(self, status, result):
+ raise ImplError("Job '{kind}' does not implement parent_complete()"
+ .format(kind=type(self).__name__))
+
+ # child_process()
+ #
+ # This will be executed after fork(), and is intended to perform
+ # the job's task.
+ #
+ # Returns:
+ # (any): A (simple!) object to be returned to the main thread
+ # as the result.
+ #
+ def child_process(self):
+ raise ImplError("Job '{kind}' does not implement child_process()"
+ .format(kind=type(self).__name__))
+
+ # message():
+ #
+ # Logs a message, this will be logged in the task's logfile and
+ # conditionally also be sent to the frontend.
+ #
+ # Args:
+ # message_type (MessageType): The type of message to send
+ # message (str): The message
+ # kwargs: Remaining Message() constructor arguments
+ #
+ def message(self, message_type, message, **kwargs):
+ args = dict(kwargs)
+ args['scheduler'] = True
+ self._scheduler.context.message(Message(None, message_type, message, **args))
+
+ # child_process_data()
+ #
+ # Abstract method to retrieve additional data that should be
+ # returned to the parent process. Note that the job result is
+ # retrieved independently.
+ #
+ # Values can later be retrieved in Job.child_data.
+ #
+ # Returns:
+ # (dict) A dict containing values to be reported to the main process
+ #
+ def child_process_data(self):
+ return {}
+
+ #######################################################
+ # Local Private Methods #
+ #######################################################
+ #
+ # Methods prefixed with the word 'child' take place in the child process
+ #
+ # Methods prefixed with the word 'parent' take place in the parent process
+ #
+ # Other methods can be called in both child or parent processes
+ #
+ #######################################################
+
+ # _child_action()
+ #
+ # Perform the action in the child process, this calls the action_cb.
+ #
+ # Args:
+ # queue (multiprocessing.Queue): The message queue for IPC
+ #
+ def _child_action(self, queue):
+
+ # This avoids some SIGTSTP signals from grandchildren
+ # getting propagated up to the master process
+ os.setsid()
+
+ # First set back to the default signal handlers for the signals
+ # we handle, and then clear their blocked state.
+ #
+ signal_list = [signal.SIGTSTP, signal.SIGTERM]
+ for sig in signal_list:
+ signal.signal(sig, signal.SIG_DFL)
+ signal.pthread_sigmask(signal.SIG_UNBLOCK, signal_list)
+
+ # Assign the queue we passed across the process boundaries
+ #
+ # Set the global message handler in this child
+ # process to forward messages to the parent process
+ self._queue = queue
+ self._scheduler.context.set_message_handler(self._child_message_handler)
+
+ starttime = datetime.datetime.now()
+ stopped_time = None
+
+ def stop_time():
+ nonlocal stopped_time
+ stopped_time = datetime.datetime.now()
+
+ def resume_time():
+ nonlocal stopped_time
+ nonlocal starttime
+ starttime += (datetime.datetime.now() - stopped_time)
+
+ # Time, log and and run the action function
+ #
+ with _signals.suspendable(stop_time, resume_time), \
+ self._scheduler.context.recorded_messages(self._logfile) as filename:
+
+ self.message(MessageType.START, self.action_name, logfile=filename)
+
+ try:
+ # Try the task action
+ result = self.child_process() # pylint: disable=assignment-from-no-return
+ except SkipJob as e:
+ elapsed = datetime.datetime.now() - starttime
+ self.message(MessageType.SKIPPED, str(e),
+ elapsed=elapsed, logfile=filename)
+
+ # Alert parent of skip by return code
+ self._child_shutdown(RC_SKIPPED)
+ except BstError as e:
+ elapsed = datetime.datetime.now() - starttime
+ retry_flag = e.temporary
+
+ if retry_flag and (self._tries <= self._max_retries):
+ self.message(MessageType.FAIL,
+ "Try #{} failed, retrying".format(self._tries),
+ elapsed=elapsed, logfile=filename)
+ else:
+ self.message(MessageType.FAIL, str(e),
+ elapsed=elapsed, detail=e.detail,
+ logfile=filename, sandbox=e.sandbox)
+
+ self._queue.put(_Envelope('child_data', self.child_process_data()))
+
+ # Report the exception to the parent (for internal testing purposes)
+ self._child_send_error(e)
+
+ # Set return code based on whether or not the error was temporary.
+ #
+ self._child_shutdown(RC_FAIL if retry_flag else RC_PERM_FAIL)
+
+ except Exception as e: # pylint: disable=broad-except
+
+ # If an unhandled (not normalized to BstError) occurs, that's a bug,
+ # send the traceback and formatted exception back to the frontend
+ # and print it to the log file.
+ #
+ elapsed = datetime.datetime.now() - starttime
+ detail = "An unhandled exception occured:\n\n{}".format(traceback.format_exc())
+
+ self.message(MessageType.BUG, self.action_name,
+ elapsed=elapsed, detail=detail,
+ logfile=filename)
+ # Unhandled exceptions should permenantly fail
+ self._child_shutdown(RC_PERM_FAIL)
+
+ else:
+ # No exception occurred in the action
+ self._queue.put(_Envelope('child_data', self.child_process_data()))
+ self._child_send_result(result)
+
+ elapsed = datetime.datetime.now() - starttime
+ self.message(MessageType.SUCCESS, self.action_name, elapsed=elapsed,
+ logfile=filename)
+
+ # Shutdown needs to stay outside of the above context manager,
+ # make sure we dont try to handle SIGTERM while the process
+ # is already busy in sys.exit()
+ self._child_shutdown(RC_OK)
+
+ # _child_send_error()
+ #
+ # Sends an error to the main process through the message queue
+ #
+ # Args:
+ # e (Exception): The error to send
+ #
+ def _child_send_error(self, e):
+ domain = None
+ reason = None
+
+ if isinstance(e, BstError):
+ domain = e.domain
+ reason = e.reason
+
+ envelope = _Envelope('error', {
+ 'domain': domain,
+ 'reason': reason
+ })
+ self._queue.put(envelope)
+
+ # _child_send_result()
+ #
+ # Sends the serialized result to the main process through the message queue
+ #
+ # Args:
+ # result (object): A simple serializable object, or None
+ #
+ # Note: If None is passed here, nothing needs to be sent, the
+ # result member in the parent process will simply remain None.
+ #
+ def _child_send_result(self, result):
+ if result is not None:
+ envelope = _Envelope('result', result)
+ self._queue.put(envelope)
+
+ # _child_shutdown()
+ #
+ # Shuts down the child process by cleaning up and exiting the process
+ #
+ # Args:
+ # exit_code (int): The exit code to exit with
+ #
+ def _child_shutdown(self, exit_code):
+ self._queue.close()
+ sys.exit(exit_code)
+
+ # _child_message_handler()
+ #
+ # A Context delegate for handling messages, this replaces the
+ # frontend's main message handler in the context of a child task
+ # and performs local logging to the local log file before sending
+ # the message back to the parent process for further propagation.
+ #
+ # Args:
+ # message (Message): The message to log
+ # context (Context): The context object delegating this message
+ #
+ def _child_message_handler(self, message, context):
+
+ message.action_name = self.action_name
+ message.task_id = self._task_id
+
+ # Send to frontend if appropriate
+ if context.silent_messages() and (message.message_type not in unconditional_messages):
+ return
+
+ if message.message_type == MessageType.LOG:
+ return
+
+ self._queue.put(_Envelope('message', message))
+
+ # _parent_shutdown()
+ #
+ # Shuts down the Job on the parent side by reading any remaining
+ # messages on the message queue and cleaning up any resources.
+ #
+ def _parent_shutdown(self):
+ # Make sure we've read everything we need and then stop listening
+ self._parent_process_queue()
+ self._parent_stop_listening()
+
+ # _parent_child_completed()
+ #
+ # Called in the main process courtesy of asyncio's ChildWatcher.add_child_handler()
+ #
+ # Args:
+ # pid (int): The PID of the child which completed
+ # returncode (int): The return code of the child process
+ #
+ def _parent_child_completed(self, pid, returncode):
+ self._parent_shutdown()
+
+ # We don't want to retry if we got OK or a permanent fail.
+ retry_flag = returncode == RC_FAIL
+
+ if retry_flag and (self._tries <= self._max_retries) and not self._scheduler.terminated:
+ self.spawn()
+ return
+
+ # Resolve the outward facing overall job completion status
+ #
+ if returncode == RC_OK:
+ status = JobStatus.OK
+ elif returncode == RC_SKIPPED:
+ status = JobStatus.SKIPPED
+ elif returncode in (RC_FAIL, RC_PERM_FAIL):
+ status = JobStatus.FAIL
+ else:
+ status = JobStatus.FAIL
+
+ self.parent_complete(status, self._result)
+ self._scheduler.job_completed(self, status)
+
+ # Force the deletion of the queue and process objects to try and clean up FDs
+ self._queue = self._process = None
+
+ # _parent_process_envelope()
+ #
+ # Processes a message Envelope deserialized form the message queue.
+ #
+ # this will have the side effect of assigning some local state
+ # on the Job in the parent process for later inspection when the
+ # child process completes.
+ #
+ # Args:
+ # envelope (Envelope): The message envelope
+ #
+ def _parent_process_envelope(self, envelope):
+ if not self._listening:
+ return
+
+ if envelope.message_type == 'message':
+ # Propagate received messages from children
+ # back through the context.
+ self._scheduler.context.message(envelope.message)
+ elif envelope.message_type == 'error':
+ # For regression tests only, save the last error domain / reason
+ # reported from a child task in the main process, this global state
+ # is currently managed in _exceptions.py
+ set_last_task_error(envelope.message['domain'],
+ envelope.message['reason'])
+ elif envelope.message_type == 'result':
+ assert self._result is None
+ self._result = envelope.message
+ elif envelope.message_type == 'child_data':
+ # If we retry a job, we assign a new value to this
+ self.child_data = envelope.message
+
+ # Try Job subclass specific messages now
+ elif not self.handle_message(envelope.message_type,
+ envelope.message):
+ assert 0, "Unhandled message type '{}': {}" \
+ .format(envelope.message_type, envelope.message)
+
+ # _parent_process_queue()
+ #
+ # Reads back message envelopes from the message queue
+ # in the parent process.
+ #
+ def _parent_process_queue(self):
+ while not self._queue.empty():
+ envelope = self._queue.get_nowait()
+ self._parent_process_envelope(envelope)
+
+ # _parent_recv()
+ #
+ # A callback to handle I/O events from the message
+ # queue file descriptor in the main process message loop
+ #
+ def _parent_recv(self, *args):
+ self._parent_process_queue()
+
+ # _parent_start_listening()
+ #
+ # Starts listening on the message queue
+ #
+ def _parent_start_listening(self):
+ # Warning: Platform specific code up ahead
+ #
+ # The multiprocessing.Queue object does not tell us how
+ # to receive io events in the receiving process, so we
+ # need to sneak in and get its file descriptor.
+ #
+ # The _reader member of the Queue is currently private
+ # but well known, perhaps it will become public:
+ #
+ # http://bugs.python.org/issue3831
+ #
+ if not self._listening:
+ self._scheduler.loop.add_reader(
+ self._queue._reader.fileno(), self._parent_recv)
+ self._listening = True
+
+ # _parent_stop_listening()
+ #
+ # Stops listening on the message queue
+ #
+ def _parent_stop_listening(self):
+ if self._listening:
+ self._scheduler.loop.remove_reader(self._queue._reader.fileno())
+ self._listening = False
diff --git a/src/buildstream/_scheduler/queues/__init__.py b/src/buildstream/_scheduler/queues/__init__.py
new file mode 100644
index 000000000..3b2293919
--- /dev/null
+++ b/src/buildstream/_scheduler/queues/__init__.py
@@ -0,0 +1 @@
+from .queue import Queue, QueueStatus
diff --git a/src/buildstream/_scheduler/queues/artifactpushqueue.py b/src/buildstream/_scheduler/queues/artifactpushqueue.py
new file mode 100644
index 000000000..b861d4fc7
--- /dev/null
+++ b/src/buildstream/_scheduler/queues/artifactpushqueue.py
@@ -0,0 +1,44 @@
+#
+# Copyright (C) 2016 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+# Authors:
+# Tristan Van Berkom <tristan.vanberkom@codethink.co.uk>
+# Jürg Billeter <juerg.billeter@codethink.co.uk>
+
+# Local imports
+from . import Queue, QueueStatus
+from ..resources import ResourceType
+from ..._exceptions import SkipJob
+
+
+# A queue which pushes element artifacts
+#
+class ArtifactPushQueue(Queue):
+
+ action_name = "Push"
+ complete_name = "Pushed"
+ resources = [ResourceType.UPLOAD]
+
+ def process(self, element):
+ # returns whether an artifact was uploaded or not
+ if not element._push():
+ raise SkipJob(self.action_name)
+
+ def status(self, element):
+ if element._skip_push():
+ return QueueStatus.SKIP
+
+ return QueueStatus.READY
diff --git a/src/buildstream/_scheduler/queues/buildqueue.py b/src/buildstream/_scheduler/queues/buildqueue.py
new file mode 100644
index 000000000..aa489f381
--- /dev/null
+++ b/src/buildstream/_scheduler/queues/buildqueue.py
@@ -0,0 +1,117 @@
+#
+# Copyright (C) 2016 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+# Authors:
+# Tristan Van Berkom <tristan.vanberkom@codethink.co.uk>
+# Jürg Billeter <juerg.billeter@codethink.co.uk>
+
+from datetime import timedelta
+
+from . import Queue, QueueStatus
+from ..jobs import ElementJob, JobStatus
+from ..resources import ResourceType
+from ..._message import MessageType
+
+
+# A queue which assembles elements
+#
+class BuildQueue(Queue):
+
+ action_name = "Build"
+ complete_name = "Built"
+ resources = [ResourceType.PROCESS, ResourceType.CACHE]
+
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self._tried = set()
+
+ def enqueue(self, elts):
+ to_queue = []
+
+ for element in elts:
+ if not element._cached_failure() or element in self._tried:
+ to_queue.append(element)
+ continue
+
+ # XXX: Fix this, See https://mail.gnome.org/archives/buildstream-list/2018-September/msg00029.html
+ # Bypass queue processing entirely the first time it's tried.
+ self._tried.add(element)
+ _, description, detail = element._get_build_result()
+ logfile = element._get_build_log()
+ self._message(element, MessageType.FAIL, description,
+ detail=detail, action_name=self.action_name,
+ elapsed=timedelta(seconds=0),
+ logfile=logfile)
+ job = ElementJob(self._scheduler, self.action_name,
+ logfile, element=element, queue=self,
+ action_cb=self.process,
+ complete_cb=self._job_done,
+ max_retries=self._max_retries)
+ self._done_queue.append(element)
+ self.failed_elements.append(element)
+ self._scheduler._job_complete_callback(job, False)
+
+ return super().enqueue(to_queue)
+
+ def process(self, element):
+ return element._assemble()
+
+ def status(self, element):
+ if not element._is_required():
+ # Artifact is not currently required but it may be requested later.
+ # Keep it in the queue.
+ return QueueStatus.WAIT
+
+ if element._cached_success():
+ return QueueStatus.SKIP
+
+ if not element._buildable():
+ return QueueStatus.WAIT
+
+ return QueueStatus.READY
+
+ def _check_cache_size(self, job, element, artifact_size):
+
+ # After completing a build job, add the artifact size
+ # as returned from Element._assemble() to the estimated
+ # artifact cache size
+ #
+ context = self._scheduler.context
+ artifacts = context.artifactcache
+
+ artifacts.add_artifact_size(artifact_size)
+
+ # If the estimated size outgrows the quota, ask the scheduler
+ # to queue a job to actually check the real cache size.
+ #
+ if artifacts.full():
+ self._scheduler.check_cache_size()
+
+ def done(self, job, element, result, status):
+
+ # Inform element in main process that assembly is done
+ element._assemble_done()
+
+ # This has to be done after _assemble_done, such that the
+ # element may register its cache key as required
+ #
+ # FIXME: Element._assemble() does not report both the failure state and the
+ # size of the newly cached failed artifact, so we can only adjust the
+ # artifact cache size for a successful build even though we know a
+ # failed build also grows the artifact cache size.
+ #
+ if status == JobStatus.OK:
+ self._check_cache_size(job, element, result)
diff --git a/src/buildstream/_scheduler/queues/fetchqueue.py b/src/buildstream/_scheduler/queues/fetchqueue.py
new file mode 100644
index 000000000..9edeebb1d
--- /dev/null
+++ b/src/buildstream/_scheduler/queues/fetchqueue.py
@@ -0,0 +1,80 @@
+#
+# Copyright (C) 2016 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+# Authors:
+# Tristan Van Berkom <tristan.vanberkom@codethink.co.uk>
+# Jürg Billeter <juerg.billeter@codethink.co.uk>
+
+# BuildStream toplevel imports
+from ... import Consistency
+
+# Local imports
+from . import Queue, QueueStatus
+from ..resources import ResourceType
+from ..jobs import JobStatus
+
+
+# A queue which fetches element sources
+#
+class FetchQueue(Queue):
+
+ action_name = "Fetch"
+ complete_name = "Fetched"
+ resources = [ResourceType.DOWNLOAD]
+
+ def __init__(self, scheduler, skip_cached=False, fetch_original=False):
+ super().__init__(scheduler)
+
+ self._skip_cached = skip_cached
+ self._fetch_original = fetch_original
+
+ def process(self, element):
+ element._fetch(fetch_original=self._fetch_original)
+
+ def status(self, element):
+ if not element._is_required():
+ # Artifact is not currently required but it may be requested later.
+ # Keep it in the queue.
+ return QueueStatus.WAIT
+
+ # Optionally skip elements that are already in the artifact cache
+ if self._skip_cached:
+ if not element._can_query_cache():
+ return QueueStatus.WAIT
+
+ if element._cached():
+ return QueueStatus.SKIP
+
+ # This will automatically skip elements which
+ # have no sources.
+
+ if not element._should_fetch(self._fetch_original):
+ return QueueStatus.SKIP
+
+ return QueueStatus.READY
+
+ def done(self, _, element, result, status):
+
+ if status == JobStatus.FAIL:
+ return
+
+ element._fetch_done()
+
+ # Successful fetch, we must be CACHED or in the sourcecache
+ if self._fetch_original:
+ assert element._get_consistency() == Consistency.CACHED
+ else:
+ assert element._source_cached()
diff --git a/src/buildstream/_scheduler/queues/pullqueue.py b/src/buildstream/_scheduler/queues/pullqueue.py
new file mode 100644
index 000000000..013ee6489
--- /dev/null
+++ b/src/buildstream/_scheduler/queues/pullqueue.py
@@ -0,0 +1,66 @@
+#
+# Copyright (C) 2016 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+# Authors:
+# Tristan Van Berkom <tristan.vanberkom@codethink.co.uk>
+# Jürg Billeter <juerg.billeter@codethink.co.uk>
+
+# Local imports
+from . import Queue, QueueStatus
+from ..resources import ResourceType
+from ..jobs import JobStatus
+from ..._exceptions import SkipJob
+
+
+# A queue which pulls element artifacts
+#
+class PullQueue(Queue):
+
+ action_name = "Pull"
+ complete_name = "Pulled"
+ resources = [ResourceType.DOWNLOAD, ResourceType.CACHE]
+
+ def process(self, element):
+ # returns whether an artifact was downloaded or not
+ if not element._pull():
+ raise SkipJob(self.action_name)
+
+ def status(self, element):
+ if not element._is_required():
+ # Artifact is not currently required but it may be requested later.
+ # Keep it in the queue.
+ return QueueStatus.WAIT
+
+ if not element._can_query_cache():
+ return QueueStatus.WAIT
+
+ if element._pull_pending():
+ return QueueStatus.READY
+ else:
+ return QueueStatus.SKIP
+
+ def done(self, _, element, result, status):
+
+ if status == JobStatus.FAIL:
+ return
+
+ element._pull_done()
+
+ # Build jobs will check the "approximate" size first. Since we
+ # do not get an artifact size from pull jobs, we have to
+ # actually check the cache size.
+ if status == JobStatus.OK:
+ self._scheduler.check_cache_size()
diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py
new file mode 100644
index 000000000..1efcffc16
--- /dev/null
+++ b/src/buildstream/_scheduler/queues/queue.py
@@ -0,0 +1,328 @@
+#
+# Copyright (C) 2016 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+# Authors:
+# Tristan Van Berkom <tristan.vanberkom@codethink.co.uk>
+# Jürg Billeter <juerg.billeter@codethink.co.uk>
+
+# System imports
+import os
+from collections import deque
+from enum import Enum
+import traceback
+
+# Local imports
+from ..jobs import ElementJob, JobStatus
+from ..resources import ResourceType
+
+# BuildStream toplevel imports
+from ..._exceptions import BstError, set_last_task_error
+from ..._message import Message, MessageType
+
+
+# Queue status for a given element
+#
+#
+class QueueStatus(Enum):
+ # The element is waiting for dependencies.
+ WAIT = 1
+
+ # The element can skip this queue.
+ SKIP = 2
+
+ # The element is ready for processing in this queue.
+ READY = 3
+
+
+# Queue()
+#
+# Args:
+# scheduler (Scheduler): The Scheduler
+#
+class Queue():
+
+ # These should be overridden on class data of of concrete Queue implementations
+ action_name = None
+ complete_name = None
+ resources = [] # Resources this queues' jobs want
+
+ def __init__(self, scheduler):
+
+ #
+ # Public members
+ #
+ self.failed_elements = [] # List of failed elements, for the frontend
+ self.processed_elements = [] # List of processed elements, for the frontend
+ self.skipped_elements = [] # List of skipped elements, for the frontend
+
+ #
+ # Private members
+ #
+ self._scheduler = scheduler
+ self._resources = scheduler.resources # Shared resource pool
+ self._wait_queue = deque() # Ready / Waiting elements
+ self._done_queue = deque() # Processed / Skipped elements
+ self._max_retries = 0
+
+ # Assert the subclass has setup class data
+ assert self.action_name is not None
+ assert self.complete_name is not None
+
+ if ResourceType.UPLOAD in self.resources or ResourceType.DOWNLOAD in self.resources:
+ self._max_retries = scheduler.context.sched_network_retries
+
+ #####################################################
+ # Abstract Methods for Queue implementations #
+ #####################################################
+
+ # process()
+ #
+ # Abstract method for processing an element
+ #
+ # Args:
+ # element (Element): An element to process
+ #
+ # Returns:
+ # (any): An optional something to be returned
+ # for every element successfully processed
+ #
+ #
+ def process(self, element):
+ pass
+
+ # status()
+ #
+ # Abstract method for reporting the status of an element.
+ #
+ # Args:
+ # element (Element): An element to process
+ #
+ # Returns:
+ # (QueueStatus): The element status
+ #
+ def status(self, element):
+ return QueueStatus.READY
+
+ # done()
+ #
+ # Abstract method for handling a successful job completion.
+ #
+ # Args:
+ # job (Job): The job which completed processing
+ # element (Element): The element which completed processing
+ # result (any): The return value of the process() implementation
+ # status (JobStatus): The return status of the Job
+ #
+ def done(self, job, element, result, status):
+ pass
+
+ #####################################################
+ # Scheduler / Pipeline facing APIs #
+ #####################################################
+
+ # enqueue()
+ #
+ # Enqueues some elements
+ #
+ # Args:
+ # elts (list): A list of Elements
+ #
+ def enqueue(self, elts):
+ if not elts:
+ return
+
+ # Place skipped elements on the done queue right away.
+ #
+ # The remaining ready and waiting elements must remain in the
+ # same queue, and ready status must be determined at the moment
+ # which the scheduler is asking for the next job.
+ #
+ skip = [elt for elt in elts if self.status(elt) == QueueStatus.SKIP]
+ wait = [elt for elt in elts if elt not in skip]
+
+ self.skipped_elements.extend(skip) # Public record of skipped elements
+ self._done_queue.extend(skip) # Elements to be processed
+ self._wait_queue.extend(wait) # Elements eligible to be dequeued
+
+ # dequeue()
+ #
+ # A generator which dequeues the elements which
+ # are ready to exit the queue.
+ #
+ # Yields:
+ # (Element): Elements being dequeued
+ #
+ def dequeue(self):
+ while self._done_queue:
+ yield self._done_queue.popleft()
+
+ # dequeue_ready()
+ #
+ # Reports whether any elements can be promoted to other queues
+ #
+ # Returns:
+ # (bool): Whether there are elements ready
+ #
+ def dequeue_ready(self):
+ return any(self._done_queue)
+
+ # harvest_jobs()
+ #
+ # Process elements in the queue, moving elements which were enqueued
+ # into the dequeue pool, and creating as many jobs for which resources
+ # can be reserved.
+ #
+ # Returns:
+ # ([Job]): A list of jobs which can be run now
+ #
+ def harvest_jobs(self):
+ unready = []
+ ready = []
+
+ while self._wait_queue:
+ if not self._resources.reserve(self.resources, peek=True):
+ break
+
+ element = self._wait_queue.popleft()
+ status = self.status(element)
+
+ if status == QueueStatus.WAIT:
+ unready.append(element)
+ elif status == QueueStatus.SKIP:
+ self._done_queue.append(element)
+ self.skipped_elements.append(element)
+ else:
+ reserved = self._resources.reserve(self.resources)
+ assert reserved
+ ready.append(element)
+
+ self._wait_queue.extendleft(unready)
+
+ return [
+ ElementJob(self._scheduler, self.action_name,
+ self._element_log_path(element),
+ element=element, queue=self,
+ action_cb=self.process,
+ complete_cb=self._job_done,
+ max_retries=self._max_retries)
+ for element in ready
+ ]
+
+ #####################################################
+ # Private Methods #
+ #####################################################
+
+ # _update_workspaces()
+ #
+ # Updates and possibly saves the workspaces in the
+ # main data model in the main process after a job completes.
+ #
+ # Args:
+ # element (Element): The element which completed
+ # job (Job): The job which completed
+ #
+ def _update_workspaces(self, element, job):
+ workspace_dict = None
+ if job.child_data:
+ workspace_dict = job.child_data.get('workspace', None)
+
+ # Handle any workspace modifications now
+ #
+ if workspace_dict:
+ context = element._get_context()
+ workspaces = context.get_workspaces()
+ if workspaces.update_workspace(element._get_full_name(), workspace_dict):
+ try:
+ workspaces.save_config()
+ except BstError as e:
+ self._message(element, MessageType.ERROR, "Error saving workspaces", detail=str(e))
+ except Exception as e: # pylint: disable=broad-except
+ self._message(element, MessageType.BUG,
+ "Unhandled exception while saving workspaces",
+ detail=traceback.format_exc())
+
+ # _job_done()
+ #
+ # A callback reported by the Job() when a job completes
+ #
+ # This will call the Queue implementation specific Queue.done()
+ # implementation and trigger the scheduler to reschedule.
+ #
+ # See the Job object for an explanation of the call signature
+ #
+ def _job_done(self, job, element, status, result):
+
+ # Now release the resources we reserved
+ #
+ self._resources.release(self.resources)
+
+ # Update values that need to be synchronized in the main task
+ # before calling any queue implementation
+ self._update_workspaces(element, job)
+
+ # Give the result of the job to the Queue implementor,
+ # and determine if it should be considered as processed
+ # or skipped.
+ try:
+ self.done(job, element, result, status)
+ except BstError as e:
+
+ # Report error and mark as failed
+ #
+ self._message(element, MessageType.ERROR, "Post processing error", detail=str(e))
+ self.failed_elements.append(element)
+
+ # Treat this as a task error as it's related to a task
+ # even though it did not occur in the task context
+ #
+ # This just allows us stronger testing capability
+ #
+ set_last_task_error(e.domain, e.reason)
+
+ except Exception as e: # pylint: disable=broad-except
+
+ # Report unhandled exceptions and mark as failed
+ #
+ self._message(element, MessageType.BUG,
+ "Unhandled exception in post processing",
+ detail=traceback.format_exc())
+ self.failed_elements.append(element)
+ else:
+ # All elements get placed on the done queue for later processing.
+ self._done_queue.append(element)
+
+ # These lists are for bookkeeping purposes for the UI and logging.
+ if status == JobStatus.SKIPPED or job.get_terminated():
+ self.skipped_elements.append(element)
+ elif status == JobStatus.OK:
+ self.processed_elements.append(element)
+ else:
+ self.failed_elements.append(element)
+
+ # Convenience wrapper for Queue implementations to send
+ # a message for the element they are processing
+ def _message(self, element, message_type, brief, **kwargs):
+ context = element._get_context()
+ message = Message(element._unique_id, message_type, brief, **kwargs)
+ context.message(message)
+
+ def _element_log_path(self, element):
+ project = element._get_project()
+ key = element._get_display_key()[1]
+ action = self.action_name.lower()
+ logfile = "{key}-{action}".format(key=key, action=action)
+
+ return os.path.join(project.name, element.normal_name, logfile)
diff --git a/src/buildstream/_scheduler/queues/sourcepushqueue.py b/src/buildstream/_scheduler/queues/sourcepushqueue.py
new file mode 100644
index 000000000..c38460e6a
--- /dev/null
+++ b/src/buildstream/_scheduler/queues/sourcepushqueue.py
@@ -0,0 +1,42 @@
+#
+# Copyright (C) 2019 Bloomberg Finance LP
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+# Authors:
+# Raoul Hidalgo Charman <raoul.hidalgocharman@codethink.co.uk>
+
+from . import Queue, QueueStatus
+from ..resources import ResourceType
+from ..._exceptions import SkipJob
+
+
+# A queue which pushes staged sources
+#
+class SourcePushQueue(Queue):
+
+ action_name = "Src-push"
+ complete_name = "Sources pushed"
+ resources = [ResourceType.UPLOAD]
+
+ def process(self, element):
+ # Returns whether a source was pushed or not
+ if not element._source_push():
+ raise SkipJob(self.action_name)
+
+ def status(self, element):
+ if element._skip_source_push():
+ return QueueStatus.SKIP
+
+ return QueueStatus.READY
diff --git a/src/buildstream/_scheduler/queues/trackqueue.py b/src/buildstream/_scheduler/queues/trackqueue.py
new file mode 100644
index 000000000..72a79a532
--- /dev/null
+++ b/src/buildstream/_scheduler/queues/trackqueue.py
@@ -0,0 +1,62 @@
+#
+# Copyright (C) 2016 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+# Authors:
+# Tristan Van Berkom <tristan.vanberkom@codethink.co.uk>
+# Jürg Billeter <juerg.billeter@codethink.co.uk>
+
+# BuildStream toplevel imports
+from ...plugin import Plugin
+
+# Local imports
+from . import Queue, QueueStatus
+from ..resources import ResourceType
+from ..jobs import JobStatus
+
+
+# A queue which tracks sources
+#
+class TrackQueue(Queue):
+
+ action_name = "Track"
+ complete_name = "Tracked"
+ resources = [ResourceType.DOWNLOAD]
+
+ def process(self, element):
+ return element._track()
+
+ def status(self, element):
+ # We can skip elements entirely if they have no sources.
+ if not list(element.sources()):
+
+ # But we still have to mark them as tracked
+ element._tracking_done()
+ return QueueStatus.SKIP
+
+ return QueueStatus.READY
+
+ def done(self, _, element, result, status):
+
+ if status == JobStatus.FAIL:
+ return
+
+ # Set the new refs in the main process one by one as they complete,
+ # writing to bst files this time
+ for unique_id, new_ref in result:
+ source = Plugin._lookup(unique_id)
+ source._set_ref(new_ref, save=True)
+
+ element._tracking_done()
diff --git a/src/buildstream/_scheduler/resources.py b/src/buildstream/_scheduler/resources.py
new file mode 100644
index 000000000..73bf66b4a
--- /dev/null
+++ b/src/buildstream/_scheduler/resources.py
@@ -0,0 +1,166 @@
+class ResourceType():
+ CACHE = 0
+ DOWNLOAD = 1
+ PROCESS = 2
+ UPLOAD = 3
+
+
+class Resources():
+ def __init__(self, num_builders, num_fetchers, num_pushers):
+ self._max_resources = {
+ ResourceType.CACHE: 0,
+ ResourceType.DOWNLOAD: num_fetchers,
+ ResourceType.PROCESS: num_builders,
+ ResourceType.UPLOAD: num_pushers
+ }
+
+ # Resources jobs are currently using.
+ self._used_resources = {
+ ResourceType.CACHE: 0,
+ ResourceType.DOWNLOAD: 0,
+ ResourceType.PROCESS: 0,
+ ResourceType.UPLOAD: 0
+ }
+
+ # Resources jobs currently want exclusive access to. The set
+ # of jobs that have asked for exclusive access is the value -
+ # this is so that we can avoid scheduling any other jobs until
+ # *all* exclusive jobs that "register interest" have finished
+ # - which avoids starving them of scheduling time.
+ self._exclusive_resources = {
+ ResourceType.CACHE: set(),
+ ResourceType.DOWNLOAD: set(),
+ ResourceType.PROCESS: set(),
+ ResourceType.UPLOAD: set()
+ }
+
+ # reserve()
+ #
+ # Reserves a set of resources
+ #
+ # Args:
+ # resources (set): A set of ResourceTypes
+ # exclusive (set): Another set of ResourceTypes
+ # peek (bool): Whether to only peek at whether the resource is available
+ #
+ # Returns:
+ # (bool): True if the resources could be reserved
+ #
+ def reserve(self, resources, exclusive=None, *, peek=False):
+ if exclusive is None:
+ exclusive = set()
+
+ resources = set(resources)
+ exclusive = set(exclusive)
+
+ # First, we check if the job wants to access a resource that
+ # another job wants exclusive access to. If so, it cannot be
+ # scheduled.
+ #
+ # Note that if *both* jobs want this exclusively, we don't
+ # fail yet.
+ #
+ # FIXME: I *think* we can deadlock if two jobs want disjoint
+ # sets of exclusive and non-exclusive resources. This
+ # is currently not possible, but may be worth thinking
+ # about.
+ #
+ for resource in resources - exclusive:
+
+ # If our job wants this resource exclusively, we never
+ # check this, so we can get away with not (temporarily)
+ # removing it from the set.
+ if self._exclusive_resources[resource]:
+ return False
+
+ # Now we check if anything is currently using any resources
+ # this job wants exclusively. If so, the job cannot be
+ # scheduled.
+ #
+ # Since jobs that use a resource exclusively are also using
+ # it, this means only one exclusive job can ever be scheduled
+ # at a time, despite being allowed to be part of the exclusive
+ # set.
+ #
+ for resource in exclusive:
+ if self._used_resources[resource] != 0:
+ return False
+
+ # Finally, we check if we have enough of each resource
+ # available. If we don't have enough, the job cannot be
+ # scheduled.
+ for resource in resources:
+ if (self._max_resources[resource] > 0 and
+ self._used_resources[resource] >= self._max_resources[resource]):
+ return False
+
+ # Now we register the fact that our job is using the resources
+ # it asked for, and tell the scheduler that it is allowed to
+ # continue.
+ if not peek:
+ for resource in resources:
+ self._used_resources[resource] += 1
+
+ return True
+
+ # release()
+ #
+ # Release resources previously reserved with Resources.reserve()
+ #
+ # Args:
+ # resources (set): A set of resources to release
+ #
+ def release(self, resources):
+ for resource in resources:
+ assert self._used_resources[resource] > 0, "Scheduler resource imbalance"
+ self._used_resources[resource] -= 1
+
+ # register_exclusive_interest()
+ #
+ # Inform the resources pool that `source` has an interest in
+ # reserving this resource exclusively.
+ #
+ # The source parameter is used to identify the caller, it
+ # must be ensured to be unique for the time that the
+ # interest is registered.
+ #
+ # This function may be called multiple times, and subsequent
+ # calls will simply have no effect until clear_exclusive_interest()
+ # is used to clear the interest.
+ #
+ # This must be called in advance of reserve()
+ #
+ # Args:
+ # resources (set): Set of resources to reserve exclusively
+ # source (any): Source identifier, to be used again when unregistering
+ # the interest.
+ #
+ def register_exclusive_interest(self, resources, source):
+
+ # The very first thing we do is to register any exclusive
+ # resources this job may want. Even if the job is not yet
+ # allowed to run (because another job is holding the resource
+ # it wants), we can still set this - it just means that any
+ # job *currently* using these resources has to finish first,
+ # and no new jobs wanting these can be launched (except other
+ # exclusive-access jobs).
+ #
+ for resource in resources:
+ self._exclusive_resources[resource].add(source)
+
+ # unregister_exclusive_interest()
+ #
+ # Clear the exclusive interest in these resources.
+ #
+ # This should be called by the given source which registered
+ # an exclusive interest.
+ #
+ # Args:
+ # resources (set): Set of resources to reserve exclusively
+ # source (str): Source identifier, to be used again when unregistering
+ # the interest.
+ #
+ def unregister_exclusive_interest(self, resources, source):
+
+ for resource in resources:
+ self._exclusive_resources[resource].discard(source)
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
new file mode 100644
index 000000000..50ad7f07a
--- /dev/null
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -0,0 +1,602 @@
+#
+# Copyright (C) 2016 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+# Authors:
+# Tristan Van Berkom <tristan.vanberkom@codethink.co.uk>
+# Jürg Billeter <juerg.billeter@codethink.co.uk>
+
+# System imports
+import os
+import asyncio
+from itertools import chain
+import signal
+import datetime
+from contextlib import contextmanager
+
+# Local imports
+from .resources import Resources, ResourceType
+from .jobs import JobStatus, CacheSizeJob, CleanupJob
+from .._profile import Topics, PROFILER
+
+
+# A decent return code for Scheduler.run()
+class SchedStatus():
+ SUCCESS = 0
+ ERROR = -1
+ TERMINATED = 1
+
+
+# Some action names for the internal jobs we launch
+#
+_ACTION_NAME_CLEANUP = 'clean'
+_ACTION_NAME_CACHE_SIZE = 'size'
+
+
+# Scheduler()
+#
+# The scheduler operates on a list queues, each of which is meant to accomplish
+# a specific task. Elements enter the first queue when Scheduler.run() is called
+# and into the next queue when complete. Scheduler.run() returns when all of the
+# elements have been traversed or when an error occurs.
+#
+# Using the scheduler is a matter of:
+# a.) Deriving the Queue class and implementing its abstract methods
+# b.) Instantiating a Scheduler with one or more queues
+# c.) Calling Scheduler.run(elements) with a list of elements
+# d.) Fetching results from your queues
+#
+# Args:
+# context: The Context in the parent scheduling process
+# start_time: The time at which the session started
+# interrupt_callback: A callback to handle ^C
+# ticker_callback: A callback call once per second
+# job_start_callback: A callback call when each job starts
+# job_complete_callback: A callback call when each job completes
+#
+class Scheduler():
+
+ def __init__(self, context,
+ start_time,
+ interrupt_callback=None,
+ ticker_callback=None,
+ job_start_callback=None,
+ job_complete_callback=None):
+
+ #
+ # Public members
+ #
+ self.queues = None # Exposed for the frontend to print summaries
+ self.context = context # The Context object shared with Queues
+ self.terminated = False # Whether the scheduler was asked to terminate or has terminated
+ self.suspended = False # Whether the scheduler is currently suspended
+
+ # These are shared with the Job, but should probably be removed or made private in some way.
+ self.loop = None # Shared for Job access to observe the message queue
+ self.internal_stops = 0 # Amount of SIGSTP signals we've introduced, this is shared with job.py
+
+ #
+ # Private members
+ #
+ self._active_jobs = [] # Jobs currently being run in the scheduler
+ self._starttime = start_time # Initial application start time
+ self._suspendtime = None # Session time compensation for suspended state
+ self._queue_jobs = True # Whether we should continue to queue jobs
+
+ # State of cache management related jobs
+ self._cache_size_scheduled = False # Whether we have a cache size job scheduled
+ self._cache_size_running = None # A running CacheSizeJob, or None
+ self._cleanup_scheduled = False # Whether we have a cleanup job scheduled
+ self._cleanup_running = None # A running CleanupJob, or None
+
+ # Callbacks to report back to the Scheduler owner
+ self._interrupt_callback = interrupt_callback
+ self._ticker_callback = ticker_callback
+ self._job_start_callback = job_start_callback
+ self._job_complete_callback = job_complete_callback
+
+ # Whether our exclusive jobs, like 'cleanup' are currently already
+ # waiting or active.
+ #
+ # This is just a bit quicker than scanning the wait queue and active
+ # queue and comparing job action names.
+ #
+ self._exclusive_waiting = set()
+ self._exclusive_active = set()
+
+ self.resources = Resources(context.sched_builders,
+ context.sched_fetchers,
+ context.sched_pushers)
+
+ # run()
+ #
+ # Args:
+ # queues (list): A list of Queue objects
+ #
+ # Returns:
+ # (timedelta): The amount of time since the start of the session,
+ # discounting any time spent while jobs were suspended
+ # (SchedStatus): How the scheduling terminated
+ #
+ # Elements in the 'plan' will be processed by each
+ # queue in order. Processing will complete when all
+ # elements have been processed by each queue or when
+ # an error arises
+ #
+ def run(self, queues):
+
+ # Hold on to the queues to process
+ self.queues = queues
+
+ # Ensure that we have a fresh new event loop, in case we want
+ # to run another test in this thread.
+ self.loop = asyncio.new_event_loop()
+ asyncio.set_event_loop(self.loop)
+
+ # Add timeouts
+ if self._ticker_callback:
+ self.loop.call_later(1, self._tick)
+
+ # Handle unix signals while running
+ self._connect_signals()
+
+ # Check if we need to start with some cache maintenance
+ self._check_cache_management()
+
+ # Start the profiler
+ with PROFILER.profile(Topics.SCHEDULER, "_".join(queue.action_name for queue in self.queues)):
+ # Run the queues
+ self._sched()
+ self.loop.run_forever()
+ self.loop.close()
+
+ # Stop handling unix signals
+ self._disconnect_signals()
+
+ failed = any(any(queue.failed_elements) for queue in self.queues)
+ self.loop = None
+
+ if failed:
+ status = SchedStatus.ERROR
+ elif self.terminated:
+ status = SchedStatus.TERMINATED
+ else:
+ status = SchedStatus.SUCCESS
+
+ return self.elapsed_time(), status
+
+ # terminate_jobs()
+ #
+ # Forcefully terminates all ongoing jobs.
+ #
+ # For this to be effective, one needs to return to
+ # the scheduler loop first and allow the scheduler
+ # to complete gracefully.
+ #
+ # NOTE: This will block SIGINT so that graceful process
+ # termination is not interrupted, and SIGINT will
+ # remain blocked after Scheduler.run() returns.
+ #
+ def terminate_jobs(self):
+
+ # Set this right away, the frontend will check this
+ # attribute to decide whether or not to print status info
+ # etc and the following code block will trigger some callbacks.
+ self.terminated = True
+ self.loop.call_soon(self._terminate_jobs_real)
+
+ # Block this until we're finished terminating jobs,
+ # this will remain blocked forever.
+ signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGINT])
+
+ # jobs_suspended()
+ #
+ # A context manager for running with jobs suspended
+ #
+ @contextmanager
+ def jobs_suspended(self):
+ self._disconnect_signals()
+ self._suspend_jobs()
+
+ yield
+
+ self._resume_jobs()
+ self._connect_signals()
+
+ # stop_queueing()
+ #
+ # Stop queueing additional jobs, causes Scheduler.run()
+ # to return once all currently processing jobs are finished.
+ #
+ def stop_queueing(self):
+ self._queue_jobs = False
+
+ # elapsed_time()
+ #
+ # Fetches the current session elapsed time
+ #
+ # Returns:
+ # (timedelta): The amount of time since the start of the session,
+ # discounting any time spent while jobs were suspended.
+ #
+ def elapsed_time(self):
+ timenow = datetime.datetime.now()
+ starttime = self._starttime
+ if not starttime:
+ starttime = timenow
+ return timenow - starttime
+
+ # job_completed():
+ #
+ # Called when a Job completes
+ #
+ # Args:
+ # queue (Queue): The Queue holding a complete job
+ # job (Job): The completed Job
+ # status (JobStatus): The status of the completed job
+ #
+ def job_completed(self, job, status):
+
+ # Remove from the active jobs list
+ self._active_jobs.remove(job)
+
+ # Scheduler owner facing callback
+ self._job_complete_callback(job, status)
+
+ # Now check for more jobs
+ self._sched()
+
+ # check_cache_size():
+ #
+ # Queues a cache size calculation job, after the cache
+ # size is calculated, a cleanup job will be run automatically
+ # if needed.
+ #
+ def check_cache_size(self):
+
+ # Here we assume we are called in response to a job
+ # completion callback, or before entering the scheduler.
+ #
+ # As such there is no need to call `_sched()` from here,
+ # and we prefer to run it once at the last moment.
+ #
+ self._cache_size_scheduled = True
+
+ #######################################################
+ # Local Private Methods #
+ #######################################################
+
+ # _check_cache_management()
+ #
+ # Run an initial check if we need to lock the cache
+ # resource and check the size and possibly launch
+ # a cleanup.
+ #
+ # Sessions which do not add to the cache are not affected.
+ #
+ def _check_cache_management(self):
+
+ # Only trigger the check for a scheduler run which has
+ # queues which require the CACHE resource.
+ if not any(q for q in self.queues
+ if ResourceType.CACHE in q.resources):
+ return
+
+ # If the estimated size outgrows the quota, queue a job to
+ # actually check the real cache size initially, this one
+ # should have exclusive access to the cache to ensure nothing
+ # starts while we are checking the cache.
+ #
+ artifacts = self.context.artifactcache
+ if artifacts.full():
+ self._sched_cache_size_job(exclusive=True)
+
+ # _spawn_job()
+ #
+ # Spanws a job
+ #
+ # Args:
+ # job (Job): The job to spawn
+ #
+ def _spawn_job(self, job):
+ self._active_jobs.append(job)
+ if self._job_start_callback:
+ self._job_start_callback(job)
+ job.spawn()
+
+ # Callback for the cache size job
+ def _cache_size_job_complete(self, status, cache_size):
+
+ # Deallocate cache size job resources
+ self._cache_size_running = None
+ self.resources.release([ResourceType.CACHE, ResourceType.PROCESS])
+
+ # Unregister the exclusive interest if there was any
+ self.resources.unregister_exclusive_interest(
+ [ResourceType.CACHE], 'cache-size'
+ )
+
+ # Schedule a cleanup job if we've hit the threshold
+ if status != JobStatus.OK:
+ return
+
+ context = self.context
+ artifacts = context.artifactcache
+
+ if artifacts.full():
+ self._cleanup_scheduled = True
+
+ # Callback for the cleanup job
+ def _cleanup_job_complete(self, status, cache_size):
+
+ # Deallocate cleanup job resources
+ self._cleanup_running = None
+ self.resources.release([ResourceType.CACHE, ResourceType.PROCESS])
+
+ # Unregister the exclusive interest when we're done with it
+ if not self._cleanup_scheduled:
+ self.resources.unregister_exclusive_interest(
+ [ResourceType.CACHE], 'cache-cleanup'
+ )
+
+ # _sched_cleanup_job()
+ #
+ # Runs a cleanup job if one is scheduled to run now and
+ # sufficient recources are available.
+ #
+ def _sched_cleanup_job(self):
+
+ if self._cleanup_scheduled and self._cleanup_running is None:
+
+ # Ensure we have an exclusive interest in the resources
+ self.resources.register_exclusive_interest(
+ [ResourceType.CACHE], 'cache-cleanup'
+ )
+
+ if self.resources.reserve([ResourceType.CACHE, ResourceType.PROCESS],
+ [ResourceType.CACHE]):
+
+ # Update state and launch
+ self._cleanup_scheduled = False
+ self._cleanup_running = \
+ CleanupJob(self, _ACTION_NAME_CLEANUP, 'cleanup/cleanup',
+ complete_cb=self._cleanup_job_complete)
+ self._spawn_job(self._cleanup_running)
+
+ # _sched_cache_size_job()
+ #
+ # Runs a cache size job if one is scheduled to run now and
+ # sufficient recources are available.
+ #
+ # Args:
+ # exclusive (bool): Run a cache size job immediately and
+ # hold the ResourceType.CACHE resource
+ # exclusively (used at startup).
+ #
+ def _sched_cache_size_job(self, *, exclusive=False):
+
+ # The exclusive argument is not intended (or safe) for arbitrary use.
+ if exclusive:
+ assert not self._cache_size_scheduled
+ assert not self._cache_size_running
+ assert not self._active_jobs
+ self._cache_size_scheduled = True
+
+ if self._cache_size_scheduled and not self._cache_size_running:
+
+ # Handle the exclusive launch
+ exclusive_resources = set()
+ if exclusive:
+ exclusive_resources.add(ResourceType.CACHE)
+ self.resources.register_exclusive_interest(
+ exclusive_resources, 'cache-size'
+ )
+
+ # Reserve the resources (with the possible exclusive cache resource)
+ if self.resources.reserve([ResourceType.CACHE, ResourceType.PROCESS],
+ exclusive_resources):
+
+ # Update state and launch
+ self._cache_size_scheduled = False
+ self._cache_size_running = \
+ CacheSizeJob(self, _ACTION_NAME_CACHE_SIZE,
+ 'cache_size/cache_size',
+ complete_cb=self._cache_size_job_complete)
+ self._spawn_job(self._cache_size_running)
+
+ # _sched_queue_jobs()
+ #
+ # Ask the queues what jobs they want to schedule and schedule
+ # them. This is done here so we can ask for new jobs when jobs
+ # from previous queues become available.
+ #
+ # This will process the Queues, pull elements through the Queues
+ # and process anything that is ready.
+ #
+ def _sched_queue_jobs(self):
+ ready = []
+ process_queues = True
+
+ while self._queue_jobs and process_queues:
+
+ # Pull elements forward through queues
+ elements = []
+ for queue in self.queues:
+ queue.enqueue(elements)
+ elements = list(queue.dequeue())
+
+ # Kickoff whatever processes can be processed at this time
+ #
+ # We start by queuing from the last queue first, because
+ # we want to give priority to queues later in the
+ # scheduling process in the case that multiple queues
+ # share the same token type.
+ #
+ # This avoids starvation situations where we dont move on
+ # to fetch tasks for elements which failed to pull, and
+ # thus need all the pulls to complete before ever starting
+ # a build
+ ready.extend(chain.from_iterable(
+ q.harvest_jobs() for q in reversed(self.queues)
+ ))
+
+ # harvest_jobs() may have decided to skip some jobs, making
+ # them eligible for promotion to the next queue as a side effect.
+ #
+ # If that happens, do another round.
+ process_queues = any(q.dequeue_ready() for q in self.queues)
+
+ # Spawn the jobs
+ #
+ for job in ready:
+ self._spawn_job(job)
+
+ # _sched()
+ #
+ # Run any jobs which are ready to run, or quit the main loop
+ # when nothing is running or is ready to run.
+ #
+ # This is the main driving function of the scheduler, it is called
+ # initially when we enter Scheduler.run(), and at the end of whenever
+ # any job completes, after any bussiness logic has occurred and before
+ # going back to sleep.
+ #
+ def _sched(self):
+
+ if not self.terminated:
+
+ #
+ # Try the cache management jobs
+ #
+ self._sched_cleanup_job()
+ self._sched_cache_size_job()
+
+ #
+ # Run as many jobs as the queues can handle for the
+ # available resources
+ #
+ self._sched_queue_jobs()
+
+ #
+ # If nothing is ticking then bail out
+ #
+ if not self._active_jobs:
+ self.loop.stop()
+
+ # _suspend_jobs()
+ #
+ # Suspend all ongoing jobs.
+ #
+ def _suspend_jobs(self):
+ if not self.suspended:
+ self._suspendtime = datetime.datetime.now()
+ self.suspended = True
+ for job in self._active_jobs:
+ job.suspend()
+
+ # _resume_jobs()
+ #
+ # Resume suspended jobs.
+ #
+ def _resume_jobs(self):
+ if self.suspended:
+ for job in self._active_jobs:
+ job.resume()
+ self.suspended = False
+ self._starttime += (datetime.datetime.now() - self._suspendtime)
+ self._suspendtime = None
+
+ # _interrupt_event():
+ #
+ # A loop registered event callback for keyboard interrupts
+ #
+ def _interrupt_event(self):
+
+ # FIXME: This should not be needed, but for some reason we receive an
+ # additional SIGINT event when the user hits ^C a second time
+ # to inform us that they really intend to terminate; even though
+ # we have disconnected our handlers at this time.
+ #
+ if self.terminated:
+ return
+
+ # Leave this to the frontend to decide, if no
+ # interrrupt callback was specified, then just terminate.
+ if self._interrupt_callback:
+ self._interrupt_callback()
+ else:
+ # Default without a frontend is just terminate
+ self.terminate_jobs()
+
+ # _terminate_event():
+ #
+ # A loop registered event callback for SIGTERM
+ #
+ def _terminate_event(self):
+ self.terminate_jobs()
+
+ # _suspend_event():
+ #
+ # A loop registered event callback for SIGTSTP
+ #
+ def _suspend_event(self):
+
+ # Ignore the feedback signals from Job.suspend()
+ if self.internal_stops:
+ self.internal_stops -= 1
+ return
+
+ # No need to care if jobs were suspended or not, we _only_ handle this
+ # while we know jobs are not suspended.
+ self._suspend_jobs()
+ os.kill(os.getpid(), signal.SIGSTOP)
+ self._resume_jobs()
+
+ # _connect_signals():
+ #
+ # Connects our signal handler event callbacks to the mainloop
+ #
+ def _connect_signals(self):
+ self.loop.add_signal_handler(signal.SIGINT, self._interrupt_event)
+ self.loop.add_signal_handler(signal.SIGTERM, self._terminate_event)
+ self.loop.add_signal_handler(signal.SIGTSTP, self._suspend_event)
+
+ def _disconnect_signals(self):
+ self.loop.remove_signal_handler(signal.SIGINT)
+ self.loop.remove_signal_handler(signal.SIGTSTP)
+ self.loop.remove_signal_handler(signal.SIGTERM)
+
+ def _terminate_jobs_real(self):
+ # 20 seconds is a long time, it can take a while and sometimes
+ # we still fail, need to look deeper into this again.
+ wait_start = datetime.datetime.now()
+ wait_limit = 20.0
+
+ # First tell all jobs to terminate
+ for job in self._active_jobs:
+ job.terminate()
+
+ # Now wait for them to really terminate
+ for job in self._active_jobs:
+ elapsed = datetime.datetime.now() - wait_start
+ timeout = max(wait_limit - elapsed.total_seconds(), 0.0)
+ if not job.terminate_wait(timeout):
+ job.kill()
+
+ # Regular timeout for driving status in the UI
+ def _tick(self):
+ elapsed = self.elapsed_time()
+ self._ticker_callback(elapsed)
+ self.loop.call_later(1, self._tick)