diff options
author | Chandan Singh <chandan@chandansingh.net> | 2019-05-21 17:08:31 +0000 |
---|---|---|
committer | Chandan Singh <chandan@chandansingh.net> | 2019-05-21 17:08:31 +0000 |
commit | 25172ed2d7b39cab799b1f2788d818d39ce3ee33 (patch) | |
tree | 423acdd8368649ced4c0fe7a4b92e586a13308a2 /src/buildstream/_scheduler | |
parent | 6c59e7901a52be961c2a1b671cf2b30f90bc4d0a (diff) | |
parent | 488b1f71c32bdae1d46b0aa1f07c82df0f0e53e2 (diff) | |
download | buildstream-25172ed2d7b39cab799b1f2788d818d39ce3ee33.tar.gz |
Merge branch 'chandan/src-directory' into 'master'
Move source from 'buildstream' to 'src/buildstream'
Closes #1009
See merge request BuildStream/buildstream!1322
Diffstat (limited to 'src/buildstream/_scheduler')
-rw-r--r-- | src/buildstream/_scheduler/__init__.py | 30 | ||||
-rw-r--r-- | src/buildstream/_scheduler/jobs/__init__.py | 23 | ||||
-rw-r--r-- | src/buildstream/_scheduler/jobs/cachesizejob.py | 41 | ||||
-rw-r--r-- | src/buildstream/_scheduler/jobs/cleanupjob.py | 50 | ||||
-rw-r--r-- | src/buildstream/_scheduler/jobs/elementjob.py | 115 | ||||
-rw-r--r-- | src/buildstream/_scheduler/jobs/job.py | 682 | ||||
-rw-r--r-- | src/buildstream/_scheduler/queues/__init__.py | 1 | ||||
-rw-r--r-- | src/buildstream/_scheduler/queues/artifactpushqueue.py | 44 | ||||
-rw-r--r-- | src/buildstream/_scheduler/queues/buildqueue.py | 117 | ||||
-rw-r--r-- | src/buildstream/_scheduler/queues/fetchqueue.py | 80 | ||||
-rw-r--r-- | src/buildstream/_scheduler/queues/pullqueue.py | 66 | ||||
-rw-r--r-- | src/buildstream/_scheduler/queues/queue.py | 328 | ||||
-rw-r--r-- | src/buildstream/_scheduler/queues/sourcepushqueue.py | 42 | ||||
-rw-r--r-- | src/buildstream/_scheduler/queues/trackqueue.py | 62 | ||||
-rw-r--r-- | src/buildstream/_scheduler/resources.py | 166 | ||||
-rw-r--r-- | src/buildstream/_scheduler/scheduler.py | 602 |
16 files changed, 2449 insertions, 0 deletions
diff --git a/src/buildstream/_scheduler/__init__.py b/src/buildstream/_scheduler/__init__.py new file mode 100644 index 000000000..d2f458fa5 --- /dev/null +++ b/src/buildstream/_scheduler/__init__.py @@ -0,0 +1,30 @@ +# +# Copyright (C) 2017 Codethink Limited +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. +# +# Authors: +# Tristan Van Berkom <tristan.vanberkom@codethink.co.uk> + +from .queues import Queue, QueueStatus + +from .queues.fetchqueue import FetchQueue +from .queues.sourcepushqueue import SourcePushQueue +from .queues.trackqueue import TrackQueue +from .queues.buildqueue import BuildQueue +from .queues.artifactpushqueue import ArtifactPushQueue +from .queues.pullqueue import PullQueue + +from .scheduler import Scheduler, SchedStatus +from .jobs import ElementJob, JobStatus diff --git a/src/buildstream/_scheduler/jobs/__init__.py b/src/buildstream/_scheduler/jobs/__init__.py new file mode 100644 index 000000000..3e213171a --- /dev/null +++ b/src/buildstream/_scheduler/jobs/__init__.py @@ -0,0 +1,23 @@ +# +# Copyright (C) 2018 Codethink Limited +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. +# +# Authors: +# Tristan Maat <tristan.maat@codethink.co.uk> + +from .elementjob import ElementJob +from .cachesizejob import CacheSizeJob +from .cleanupjob import CleanupJob +from .job import JobStatus diff --git a/src/buildstream/_scheduler/jobs/cachesizejob.py b/src/buildstream/_scheduler/jobs/cachesizejob.py new file mode 100644 index 000000000..5f27b7fc1 --- /dev/null +++ b/src/buildstream/_scheduler/jobs/cachesizejob.py @@ -0,0 +1,41 @@ +# Copyright (C) 2018 Codethink Limited +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. +# +# Author: +# Tristan Daniël Maat <tristan.maat@codethink.co.uk> +# +from .job import Job, JobStatus + + +class CacheSizeJob(Job): + def __init__(self, *args, complete_cb, **kwargs): + super().__init__(*args, **kwargs) + self._complete_cb = complete_cb + + context = self._scheduler.context + self._casquota = context.get_casquota() + + def child_process(self): + return self._casquota.compute_cache_size() + + def parent_complete(self, status, result): + if status == JobStatus.OK: + self._casquota.set_cache_size(result) + + if self._complete_cb: + self._complete_cb(status, result) + + def child_process_data(self): + return {} diff --git a/src/buildstream/_scheduler/jobs/cleanupjob.py b/src/buildstream/_scheduler/jobs/cleanupjob.py new file mode 100644 index 000000000..4764b30b3 --- /dev/null +++ b/src/buildstream/_scheduler/jobs/cleanupjob.py @@ -0,0 +1,50 @@ +# Copyright (C) 2018 Codethink Limited +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. +# +# Author: +# Tristan Daniël Maat <tristan.maat@codethink.co.uk> +# +from .job import Job, JobStatus + + +class CleanupJob(Job): + def __init__(self, *args, complete_cb, **kwargs): + super().__init__(*args, **kwargs) + self._complete_cb = complete_cb + + context = self._scheduler.context + self._casquota = context.get_casquota() + + def child_process(self): + def progress(): + self.send_message('update-cache-size', + self._casquota.get_cache_size()) + return self._casquota.clean(progress) + + def handle_message(self, message_type, message): + # Update the cache size in the main process as we go, + # this provides better feedback in the UI. + if message_type == 'update-cache-size': + self._casquota.set_cache_size(message, write_to_disk=False) + return True + + return False + + def parent_complete(self, status, result): + if status == JobStatus.OK: + self._casquota.set_cache_size(result, write_to_disk=False) + + if self._complete_cb: + self._complete_cb(status, result) diff --git a/src/buildstream/_scheduler/jobs/elementjob.py b/src/buildstream/_scheduler/jobs/elementjob.py new file mode 100644 index 000000000..fb5d38e11 --- /dev/null +++ b/src/buildstream/_scheduler/jobs/elementjob.py @@ -0,0 +1,115 @@ +# Copyright (C) 2018 Codethink Limited +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. +# +# Author: +# Tristan Daniël Maat <tristan.maat@codethink.co.uk> +# +from ruamel import yaml + +from ..._message import Message, MessageType + +from .job import Job + + +# ElementJob() +# +# A job to run an element's commands. When this job is spawned +# `action_cb` will be called, and when it completes `complete_cb` will +# be called. +# +# Args: +# scheduler (Scheduler): The scheduler +# action_name (str): The queue action name +# max_retries (int): The maximum number of retries +# action_cb (callable): The function to execute on the child +# complete_cb (callable): The function to execute when the job completes +# element (Element): The element to work on +# kwargs: Remaining Job() constructor arguments +# +# Here is the calling signature of the action_cb: +# +# action_cb(): +# +# This function will be called in the child task +# +# Args: +# element (Element): The element passed to the Job() constructor +# +# Returns: +# (object): Any abstract simple python object, including a string, int, +# bool, list or dict, this must be a simple serializable object. +# +# Here is the calling signature of the complete_cb: +# +# complete_cb(): +# +# This function will be called when the child task completes +# +# Args: +# job (Job): The job object which completed +# element (Element): The element passed to the Job() constructor +# status (JobStatus): The status of whether the workload raised an exception +# result (object): The deserialized object returned by the `action_cb`, or None +# if `success` is False +# +class ElementJob(Job): + def __init__(self, *args, element, queue, action_cb, complete_cb, **kwargs): + super().__init__(*args, **kwargs) + self.queue = queue + self._element = element + self._action_cb = action_cb # The action callable function + self._complete_cb = complete_cb # The complete callable function + + # Set the task wide ID for logging purposes + self.set_task_id(element._unique_id) + + @property + def element(self): + return self._element + + def child_process(self): + + # Print the element's environment at the beginning of any element's log file. + # + # This should probably be omitted for non-build tasks but it's harmless here + elt_env = self._element.get_environment() + env_dump = yaml.round_trip_dump(elt_env, default_flow_style=False, allow_unicode=True) + self.message(MessageType.LOG, + "Build environment for element {}".format(self._element.name), + detail=env_dump) + + # Run the action + return self._action_cb(self._element) + + def parent_complete(self, status, result): + self._complete_cb(self, self._element, status, self._result) + + def message(self, message_type, message, **kwargs): + args = dict(kwargs) + args['scheduler'] = True + self._scheduler.context.message( + Message(self._element._unique_id, + message_type, + message, + **args)) + + def child_process_data(self): + data = {} + + workspace = self._element._get_workspace() + if workspace is not None: + data['workspace'] = workspace.to_dict() + + return data diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py new file mode 100644 index 000000000..dd91d1634 --- /dev/null +++ b/src/buildstream/_scheduler/jobs/job.py @@ -0,0 +1,682 @@ +# +# Copyright (C) 2018 Codethink Limited +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. +# +# Authors: +# Tristan Van Berkom <tristan.vanberkom@codethink.co.uk> +# Jürg Billeter <juerg.billeter@codethink.co.uk> +# Tristan Maat <tristan.maat@codethink.co.uk> + +# System imports +import os +import sys +import signal +import datetime +import traceback +import asyncio +import multiprocessing + +# BuildStream toplevel imports +from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob +from ..._message import Message, MessageType, unconditional_messages +from ... import _signals, utils + +# Return code values shutdown of job handling child processes +# +RC_OK = 0 +RC_FAIL = 1 +RC_PERM_FAIL = 2 +RC_SKIPPED = 3 + + +# JobStatus: +# +# The job completion status, passed back through the +# complete callbacks. +# +class JobStatus(): + # Job succeeded + OK = 0 + + # A temporary BstError was raised + FAIL = 1 + + # A SkipJob was raised + SKIPPED = 3 + + +# Used to distinguish between status messages and return values +class _Envelope(): + def __init__(self, message_type, message): + self.message_type = message_type + self.message = message + + +# Process class that doesn't call waitpid on its own. +# This prevents conflicts with the asyncio child watcher. +class Process(multiprocessing.Process): + # pylint: disable=attribute-defined-outside-init + def start(self): + self._popen = self._Popen(self) + self._sentinel = self._popen.sentinel + + +# Job() +# +# The Job object represents a parallel task, when calling Job.spawn(), +# the given `Job.child_process()` will be called in parallel to the +# calling process, and `Job.parent_complete()` will be called with the +# action result in the calling process when the job completes. +# +# Args: +# scheduler (Scheduler): The scheduler +# action_name (str): The queue action name +# logfile (str): A template string that points to the logfile +# that should be used - should contain {pid}. +# max_retries (int): The maximum number of retries +# +class Job(): + + def __init__(self, scheduler, action_name, logfile, *, max_retries=0): + + # + # Public members + # + self.action_name = action_name # The action name for the Queue + self.child_data = None # Data to be sent to the main process + + # + # Private members + # + self._scheduler = scheduler # The scheduler + self._queue = None # A message passing queue + self._process = None # The Process object + self._watcher = None # Child process watcher + self._listening = False # Whether the parent is currently listening + self._suspended = False # Whether this job is currently suspended + self._max_retries = max_retries # Maximum number of automatic retries + self._result = None # Return value of child action in the parent + self._tries = 0 # Try count, for retryable jobs + self._terminated = False # Whether this job has been explicitly terminated + + self._logfile = logfile + self._task_id = None + + # spawn() + # + # Spawns the job. + # + def spawn(self): + + self._queue = multiprocessing.Queue() + + self._tries += 1 + self._parent_start_listening() + + # Spawn the process + self._process = Process(target=self._child_action, args=[self._queue]) + + # Block signals which are handled in the main process such that + # the child process does not inherit the parent's state, but the main + # process will be notified of any signal after we launch the child. + # + with _signals.blocked([signal.SIGINT, signal.SIGTSTP, signal.SIGTERM], ignore=False): + self._process.start() + + # Wait for the child task to complete. + # + # This is a tricky part of python which doesnt seem to + # make it to the online docs: + # + # o asyncio.get_child_watcher() will return a SafeChildWatcher() instance + # which is the default type of watcher, and the instance belongs to the + # "event loop policy" in use (so there is only one in the main process). + # + # o SafeChildWatcher() will register a SIGCHLD handler with the asyncio + # loop, and will selectively reap any child pids which have been + # terminated. + # + # o At registration time, the process will immediately be checked with + # `os.waitpid()` and will be reaped immediately, before add_child_handler() + # returns. + # + # The self._parent_child_completed callback passed here will normally + # be called after the child task has been reaped with `os.waitpid()`, in + # an event loop callback. Otherwise, if the job completes too fast, then + # the callback is called immediately. + # + self._watcher = asyncio.get_child_watcher() + self._watcher.add_child_handler(self._process.pid, self._parent_child_completed) + + # terminate() + # + # Politely request that an ongoing job terminate soon. + # + # This will send a SIGTERM signal to the Job process. + # + def terminate(self): + + # First resume the job if it's suspended + self.resume(silent=True) + + self.message(MessageType.STATUS, "{} terminating".format(self.action_name)) + + # Make sure there is no garbage on the queue + self._parent_stop_listening() + + # Terminate the process using multiprocessing API pathway + self._process.terminate() + + self._terminated = True + + # get_terminated() + # + # Check if a job has been terminated. + # + # Returns: + # (bool): True in the main process if Job.terminate() was called. + # + def get_terminated(self): + return self._terminated + + # terminate_wait() + # + # Wait for terminated jobs to complete + # + # Args: + # timeout (float): Seconds to wait + # + # Returns: + # (bool): True if the process terminated cleanly, otherwise False + # + def terminate_wait(self, timeout): + + # Join the child process after sending SIGTERM + self._process.join(timeout) + return self._process.exitcode is not None + + # kill() + # + # Forcefully kill the process, and any children it might have. + # + def kill(self): + # Force kill + self.message(MessageType.WARN, + "{} did not terminate gracefully, killing".format(self.action_name)) + utils._kill_process_tree(self._process.pid) + + # suspend() + # + # Suspend this job. + # + def suspend(self): + if not self._suspended: + self.message(MessageType.STATUS, + "{} suspending".format(self.action_name)) + + try: + # Use SIGTSTP so that child processes may handle and propagate + # it to processes they spawn that become session leaders + os.kill(self._process.pid, signal.SIGTSTP) + + # For some reason we receive exactly one suspend event for every + # SIGTSTP we send to the child fork(), even though the child forks + # are setsid(). We keep a count of these so we can ignore them + # in our event loop suspend_event() + self._scheduler.internal_stops += 1 + self._suspended = True + except ProcessLookupError: + # ignore, process has already exited + pass + + # resume() + # + # Resume this suspended job. + # + def resume(self, silent=False): + if self._suspended: + if not silent and not self._scheduler.terminated: + self.message(MessageType.STATUS, + "{} resuming".format(self.action_name)) + + os.kill(self._process.pid, signal.SIGCONT) + self._suspended = False + + # set_task_id() + # + # This is called by Job subclasses to set a plugin ID + # associated with the task at large (if any element is related + # to the task). + # + # The task ID helps keep messages in the frontend coherent + # in the case that multiple plugins log in the context of + # a single task (e.g. running integration commands should appear + # in the frontend for the element being built, not the element + # running the integration commands). + # + # Args: + # task_id (int): The plugin identifier for this task + # + def set_task_id(self, task_id): + self._task_id = task_id + + # send_message() + # + # To be called from inside Job.child_process() implementations + # to send messages to the main process during processing. + # + # These messages will be processed by the class's Job.handle_message() + # implementation. + # + def send_message(self, message_type, message): + self._queue.put(_Envelope(message_type, message)) + + ####################################################### + # Abstract Methods # + ####################################################### + + # handle_message() + # + # Handle a custom message. This will be called in the main process in + # response to any messages sent to the main proces using the + # Job.send_message() API from inside a Job.child_process() implementation + # + # Args: + # message_type (str): A string to identify the message type + # message (any): A simple serializable object + # + # Returns: + # (bool): Should return a truthy value if message_type is handled. + # + def handle_message(self, message_type, message): + return False + + # parent_complete() + # + # This will be executed after the job finishes, and is expected to + # pass the result to the main thread. + # + # Args: + # status (JobStatus): The job exit status + # result (any): The result returned by child_process(). + # + def parent_complete(self, status, result): + raise ImplError("Job '{kind}' does not implement parent_complete()" + .format(kind=type(self).__name__)) + + # child_process() + # + # This will be executed after fork(), and is intended to perform + # the job's task. + # + # Returns: + # (any): A (simple!) object to be returned to the main thread + # as the result. + # + def child_process(self): + raise ImplError("Job '{kind}' does not implement child_process()" + .format(kind=type(self).__name__)) + + # message(): + # + # Logs a message, this will be logged in the task's logfile and + # conditionally also be sent to the frontend. + # + # Args: + # message_type (MessageType): The type of message to send + # message (str): The message + # kwargs: Remaining Message() constructor arguments + # + def message(self, message_type, message, **kwargs): + args = dict(kwargs) + args['scheduler'] = True + self._scheduler.context.message(Message(None, message_type, message, **args)) + + # child_process_data() + # + # Abstract method to retrieve additional data that should be + # returned to the parent process. Note that the job result is + # retrieved independently. + # + # Values can later be retrieved in Job.child_data. + # + # Returns: + # (dict) A dict containing values to be reported to the main process + # + def child_process_data(self): + return {} + + ####################################################### + # Local Private Methods # + ####################################################### + # + # Methods prefixed with the word 'child' take place in the child process + # + # Methods prefixed with the word 'parent' take place in the parent process + # + # Other methods can be called in both child or parent processes + # + ####################################################### + + # _child_action() + # + # Perform the action in the child process, this calls the action_cb. + # + # Args: + # queue (multiprocessing.Queue): The message queue for IPC + # + def _child_action(self, queue): + + # This avoids some SIGTSTP signals from grandchildren + # getting propagated up to the master process + os.setsid() + + # First set back to the default signal handlers for the signals + # we handle, and then clear their blocked state. + # + signal_list = [signal.SIGTSTP, signal.SIGTERM] + for sig in signal_list: + signal.signal(sig, signal.SIG_DFL) + signal.pthread_sigmask(signal.SIG_UNBLOCK, signal_list) + + # Assign the queue we passed across the process boundaries + # + # Set the global message handler in this child + # process to forward messages to the parent process + self._queue = queue + self._scheduler.context.set_message_handler(self._child_message_handler) + + starttime = datetime.datetime.now() + stopped_time = None + + def stop_time(): + nonlocal stopped_time + stopped_time = datetime.datetime.now() + + def resume_time(): + nonlocal stopped_time + nonlocal starttime + starttime += (datetime.datetime.now() - stopped_time) + + # Time, log and and run the action function + # + with _signals.suspendable(stop_time, resume_time), \ + self._scheduler.context.recorded_messages(self._logfile) as filename: + + self.message(MessageType.START, self.action_name, logfile=filename) + + try: + # Try the task action + result = self.child_process() # pylint: disable=assignment-from-no-return + except SkipJob as e: + elapsed = datetime.datetime.now() - starttime + self.message(MessageType.SKIPPED, str(e), + elapsed=elapsed, logfile=filename) + + # Alert parent of skip by return code + self._child_shutdown(RC_SKIPPED) + except BstError as e: + elapsed = datetime.datetime.now() - starttime + retry_flag = e.temporary + + if retry_flag and (self._tries <= self._max_retries): + self.message(MessageType.FAIL, + "Try #{} failed, retrying".format(self._tries), + elapsed=elapsed, logfile=filename) + else: + self.message(MessageType.FAIL, str(e), + elapsed=elapsed, detail=e.detail, + logfile=filename, sandbox=e.sandbox) + + self._queue.put(_Envelope('child_data', self.child_process_data())) + + # Report the exception to the parent (for internal testing purposes) + self._child_send_error(e) + + # Set return code based on whether or not the error was temporary. + # + self._child_shutdown(RC_FAIL if retry_flag else RC_PERM_FAIL) + + except Exception as e: # pylint: disable=broad-except + + # If an unhandled (not normalized to BstError) occurs, that's a bug, + # send the traceback and formatted exception back to the frontend + # and print it to the log file. + # + elapsed = datetime.datetime.now() - starttime + detail = "An unhandled exception occured:\n\n{}".format(traceback.format_exc()) + + self.message(MessageType.BUG, self.action_name, + elapsed=elapsed, detail=detail, + logfile=filename) + # Unhandled exceptions should permenantly fail + self._child_shutdown(RC_PERM_FAIL) + + else: + # No exception occurred in the action + self._queue.put(_Envelope('child_data', self.child_process_data())) + self._child_send_result(result) + + elapsed = datetime.datetime.now() - starttime + self.message(MessageType.SUCCESS, self.action_name, elapsed=elapsed, + logfile=filename) + + # Shutdown needs to stay outside of the above context manager, + # make sure we dont try to handle SIGTERM while the process + # is already busy in sys.exit() + self._child_shutdown(RC_OK) + + # _child_send_error() + # + # Sends an error to the main process through the message queue + # + # Args: + # e (Exception): The error to send + # + def _child_send_error(self, e): + domain = None + reason = None + + if isinstance(e, BstError): + domain = e.domain + reason = e.reason + + envelope = _Envelope('error', { + 'domain': domain, + 'reason': reason + }) + self._queue.put(envelope) + + # _child_send_result() + # + # Sends the serialized result to the main process through the message queue + # + # Args: + # result (object): A simple serializable object, or None + # + # Note: If None is passed here, nothing needs to be sent, the + # result member in the parent process will simply remain None. + # + def _child_send_result(self, result): + if result is not None: + envelope = _Envelope('result', result) + self._queue.put(envelope) + + # _child_shutdown() + # + # Shuts down the child process by cleaning up and exiting the process + # + # Args: + # exit_code (int): The exit code to exit with + # + def _child_shutdown(self, exit_code): + self._queue.close() + sys.exit(exit_code) + + # _child_message_handler() + # + # A Context delegate for handling messages, this replaces the + # frontend's main message handler in the context of a child task + # and performs local logging to the local log file before sending + # the message back to the parent process for further propagation. + # + # Args: + # message (Message): The message to log + # context (Context): The context object delegating this message + # + def _child_message_handler(self, message, context): + + message.action_name = self.action_name + message.task_id = self._task_id + + # Send to frontend if appropriate + if context.silent_messages() and (message.message_type not in unconditional_messages): + return + + if message.message_type == MessageType.LOG: + return + + self._queue.put(_Envelope('message', message)) + + # _parent_shutdown() + # + # Shuts down the Job on the parent side by reading any remaining + # messages on the message queue and cleaning up any resources. + # + def _parent_shutdown(self): + # Make sure we've read everything we need and then stop listening + self._parent_process_queue() + self._parent_stop_listening() + + # _parent_child_completed() + # + # Called in the main process courtesy of asyncio's ChildWatcher.add_child_handler() + # + # Args: + # pid (int): The PID of the child which completed + # returncode (int): The return code of the child process + # + def _parent_child_completed(self, pid, returncode): + self._parent_shutdown() + + # We don't want to retry if we got OK or a permanent fail. + retry_flag = returncode == RC_FAIL + + if retry_flag and (self._tries <= self._max_retries) and not self._scheduler.terminated: + self.spawn() + return + + # Resolve the outward facing overall job completion status + # + if returncode == RC_OK: + status = JobStatus.OK + elif returncode == RC_SKIPPED: + status = JobStatus.SKIPPED + elif returncode in (RC_FAIL, RC_PERM_FAIL): + status = JobStatus.FAIL + else: + status = JobStatus.FAIL + + self.parent_complete(status, self._result) + self._scheduler.job_completed(self, status) + + # Force the deletion of the queue and process objects to try and clean up FDs + self._queue = self._process = None + + # _parent_process_envelope() + # + # Processes a message Envelope deserialized form the message queue. + # + # this will have the side effect of assigning some local state + # on the Job in the parent process for later inspection when the + # child process completes. + # + # Args: + # envelope (Envelope): The message envelope + # + def _parent_process_envelope(self, envelope): + if not self._listening: + return + + if envelope.message_type == 'message': + # Propagate received messages from children + # back through the context. + self._scheduler.context.message(envelope.message) + elif envelope.message_type == 'error': + # For regression tests only, save the last error domain / reason + # reported from a child task in the main process, this global state + # is currently managed in _exceptions.py + set_last_task_error(envelope.message['domain'], + envelope.message['reason']) + elif envelope.message_type == 'result': + assert self._result is None + self._result = envelope.message + elif envelope.message_type == 'child_data': + # If we retry a job, we assign a new value to this + self.child_data = envelope.message + + # Try Job subclass specific messages now + elif not self.handle_message(envelope.message_type, + envelope.message): + assert 0, "Unhandled message type '{}': {}" \ + .format(envelope.message_type, envelope.message) + + # _parent_process_queue() + # + # Reads back message envelopes from the message queue + # in the parent process. + # + def _parent_process_queue(self): + while not self._queue.empty(): + envelope = self._queue.get_nowait() + self._parent_process_envelope(envelope) + + # _parent_recv() + # + # A callback to handle I/O events from the message + # queue file descriptor in the main process message loop + # + def _parent_recv(self, *args): + self._parent_process_queue() + + # _parent_start_listening() + # + # Starts listening on the message queue + # + def _parent_start_listening(self): + # Warning: Platform specific code up ahead + # + # The multiprocessing.Queue object does not tell us how + # to receive io events in the receiving process, so we + # need to sneak in and get its file descriptor. + # + # The _reader member of the Queue is currently private + # but well known, perhaps it will become public: + # + # http://bugs.python.org/issue3831 + # + if not self._listening: + self._scheduler.loop.add_reader( + self._queue._reader.fileno(), self._parent_recv) + self._listening = True + + # _parent_stop_listening() + # + # Stops listening on the message queue + # + def _parent_stop_listening(self): + if self._listening: + self._scheduler.loop.remove_reader(self._queue._reader.fileno()) + self._listening = False diff --git a/src/buildstream/_scheduler/queues/__init__.py b/src/buildstream/_scheduler/queues/__init__.py new file mode 100644 index 000000000..3b2293919 --- /dev/null +++ b/src/buildstream/_scheduler/queues/__init__.py @@ -0,0 +1 @@ +from .queue import Queue, QueueStatus diff --git a/src/buildstream/_scheduler/queues/artifactpushqueue.py b/src/buildstream/_scheduler/queues/artifactpushqueue.py new file mode 100644 index 000000000..b861d4fc7 --- /dev/null +++ b/src/buildstream/_scheduler/queues/artifactpushqueue.py @@ -0,0 +1,44 @@ +# +# Copyright (C) 2016 Codethink Limited +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. +# +# Authors: +# Tristan Van Berkom <tristan.vanberkom@codethink.co.uk> +# Jürg Billeter <juerg.billeter@codethink.co.uk> + +# Local imports +from . import Queue, QueueStatus +from ..resources import ResourceType +from ..._exceptions import SkipJob + + +# A queue which pushes element artifacts +# +class ArtifactPushQueue(Queue): + + action_name = "Push" + complete_name = "Pushed" + resources = [ResourceType.UPLOAD] + + def process(self, element): + # returns whether an artifact was uploaded or not + if not element._push(): + raise SkipJob(self.action_name) + + def status(self, element): + if element._skip_push(): + return QueueStatus.SKIP + + return QueueStatus.READY diff --git a/src/buildstream/_scheduler/queues/buildqueue.py b/src/buildstream/_scheduler/queues/buildqueue.py new file mode 100644 index 000000000..aa489f381 --- /dev/null +++ b/src/buildstream/_scheduler/queues/buildqueue.py @@ -0,0 +1,117 @@ +# +# Copyright (C) 2016 Codethink Limited +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. +# +# Authors: +# Tristan Van Berkom <tristan.vanberkom@codethink.co.uk> +# Jürg Billeter <juerg.billeter@codethink.co.uk> + +from datetime import timedelta + +from . import Queue, QueueStatus +from ..jobs import ElementJob, JobStatus +from ..resources import ResourceType +from ..._message import MessageType + + +# A queue which assembles elements +# +class BuildQueue(Queue): + + action_name = "Build" + complete_name = "Built" + resources = [ResourceType.PROCESS, ResourceType.CACHE] + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._tried = set() + + def enqueue(self, elts): + to_queue = [] + + for element in elts: + if not element._cached_failure() or element in self._tried: + to_queue.append(element) + continue + + # XXX: Fix this, See https://mail.gnome.org/archives/buildstream-list/2018-September/msg00029.html + # Bypass queue processing entirely the first time it's tried. + self._tried.add(element) + _, description, detail = element._get_build_result() + logfile = element._get_build_log() + self._message(element, MessageType.FAIL, description, + detail=detail, action_name=self.action_name, + elapsed=timedelta(seconds=0), + logfile=logfile) + job = ElementJob(self._scheduler, self.action_name, + logfile, element=element, queue=self, + action_cb=self.process, + complete_cb=self._job_done, + max_retries=self._max_retries) + self._done_queue.append(element) + self.failed_elements.append(element) + self._scheduler._job_complete_callback(job, False) + + return super().enqueue(to_queue) + + def process(self, element): + return element._assemble() + + def status(self, element): + if not element._is_required(): + # Artifact is not currently required but it may be requested later. + # Keep it in the queue. + return QueueStatus.WAIT + + if element._cached_success(): + return QueueStatus.SKIP + + if not element._buildable(): + return QueueStatus.WAIT + + return QueueStatus.READY + + def _check_cache_size(self, job, element, artifact_size): + + # After completing a build job, add the artifact size + # as returned from Element._assemble() to the estimated + # artifact cache size + # + context = self._scheduler.context + artifacts = context.artifactcache + + artifacts.add_artifact_size(artifact_size) + + # If the estimated size outgrows the quota, ask the scheduler + # to queue a job to actually check the real cache size. + # + if artifacts.full(): + self._scheduler.check_cache_size() + + def done(self, job, element, result, status): + + # Inform element in main process that assembly is done + element._assemble_done() + + # This has to be done after _assemble_done, such that the + # element may register its cache key as required + # + # FIXME: Element._assemble() does not report both the failure state and the + # size of the newly cached failed artifact, so we can only adjust the + # artifact cache size for a successful build even though we know a + # failed build also grows the artifact cache size. + # + if status == JobStatus.OK: + self._check_cache_size(job, element, result) diff --git a/src/buildstream/_scheduler/queues/fetchqueue.py b/src/buildstream/_scheduler/queues/fetchqueue.py new file mode 100644 index 000000000..9edeebb1d --- /dev/null +++ b/src/buildstream/_scheduler/queues/fetchqueue.py @@ -0,0 +1,80 @@ +# +# Copyright (C) 2016 Codethink Limited +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. +# +# Authors: +# Tristan Van Berkom <tristan.vanberkom@codethink.co.uk> +# Jürg Billeter <juerg.billeter@codethink.co.uk> + +# BuildStream toplevel imports +from ... import Consistency + +# Local imports +from . import Queue, QueueStatus +from ..resources import ResourceType +from ..jobs import JobStatus + + +# A queue which fetches element sources +# +class FetchQueue(Queue): + + action_name = "Fetch" + complete_name = "Fetched" + resources = [ResourceType.DOWNLOAD] + + def __init__(self, scheduler, skip_cached=False, fetch_original=False): + super().__init__(scheduler) + + self._skip_cached = skip_cached + self._fetch_original = fetch_original + + def process(self, element): + element._fetch(fetch_original=self._fetch_original) + + def status(self, element): + if not element._is_required(): + # Artifact is not currently required but it may be requested later. + # Keep it in the queue. + return QueueStatus.WAIT + + # Optionally skip elements that are already in the artifact cache + if self._skip_cached: + if not element._can_query_cache(): + return QueueStatus.WAIT + + if element._cached(): + return QueueStatus.SKIP + + # This will automatically skip elements which + # have no sources. + + if not element._should_fetch(self._fetch_original): + return QueueStatus.SKIP + + return QueueStatus.READY + + def done(self, _, element, result, status): + + if status == JobStatus.FAIL: + return + + element._fetch_done() + + # Successful fetch, we must be CACHED or in the sourcecache + if self._fetch_original: + assert element._get_consistency() == Consistency.CACHED + else: + assert element._source_cached() diff --git a/src/buildstream/_scheduler/queues/pullqueue.py b/src/buildstream/_scheduler/queues/pullqueue.py new file mode 100644 index 000000000..013ee6489 --- /dev/null +++ b/src/buildstream/_scheduler/queues/pullqueue.py @@ -0,0 +1,66 @@ +# +# Copyright (C) 2016 Codethink Limited +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. +# +# Authors: +# Tristan Van Berkom <tristan.vanberkom@codethink.co.uk> +# Jürg Billeter <juerg.billeter@codethink.co.uk> + +# Local imports +from . import Queue, QueueStatus +from ..resources import ResourceType +from ..jobs import JobStatus +from ..._exceptions import SkipJob + + +# A queue which pulls element artifacts +# +class PullQueue(Queue): + + action_name = "Pull" + complete_name = "Pulled" + resources = [ResourceType.DOWNLOAD, ResourceType.CACHE] + + def process(self, element): + # returns whether an artifact was downloaded or not + if not element._pull(): + raise SkipJob(self.action_name) + + def status(self, element): + if not element._is_required(): + # Artifact is not currently required but it may be requested later. + # Keep it in the queue. + return QueueStatus.WAIT + + if not element._can_query_cache(): + return QueueStatus.WAIT + + if element._pull_pending(): + return QueueStatus.READY + else: + return QueueStatus.SKIP + + def done(self, _, element, result, status): + + if status == JobStatus.FAIL: + return + + element._pull_done() + + # Build jobs will check the "approximate" size first. Since we + # do not get an artifact size from pull jobs, we have to + # actually check the cache size. + if status == JobStatus.OK: + self._scheduler.check_cache_size() diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py new file mode 100644 index 000000000..1efcffc16 --- /dev/null +++ b/src/buildstream/_scheduler/queues/queue.py @@ -0,0 +1,328 @@ +# +# Copyright (C) 2016 Codethink Limited +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. +# +# Authors: +# Tristan Van Berkom <tristan.vanberkom@codethink.co.uk> +# Jürg Billeter <juerg.billeter@codethink.co.uk> + +# System imports +import os +from collections import deque +from enum import Enum +import traceback + +# Local imports +from ..jobs import ElementJob, JobStatus +from ..resources import ResourceType + +# BuildStream toplevel imports +from ..._exceptions import BstError, set_last_task_error +from ..._message import Message, MessageType + + +# Queue status for a given element +# +# +class QueueStatus(Enum): + # The element is waiting for dependencies. + WAIT = 1 + + # The element can skip this queue. + SKIP = 2 + + # The element is ready for processing in this queue. + READY = 3 + + +# Queue() +# +# Args: +# scheduler (Scheduler): The Scheduler +# +class Queue(): + + # These should be overridden on class data of of concrete Queue implementations + action_name = None + complete_name = None + resources = [] # Resources this queues' jobs want + + def __init__(self, scheduler): + + # + # Public members + # + self.failed_elements = [] # List of failed elements, for the frontend + self.processed_elements = [] # List of processed elements, for the frontend + self.skipped_elements = [] # List of skipped elements, for the frontend + + # + # Private members + # + self._scheduler = scheduler + self._resources = scheduler.resources # Shared resource pool + self._wait_queue = deque() # Ready / Waiting elements + self._done_queue = deque() # Processed / Skipped elements + self._max_retries = 0 + + # Assert the subclass has setup class data + assert self.action_name is not None + assert self.complete_name is not None + + if ResourceType.UPLOAD in self.resources or ResourceType.DOWNLOAD in self.resources: + self._max_retries = scheduler.context.sched_network_retries + + ##################################################### + # Abstract Methods for Queue implementations # + ##################################################### + + # process() + # + # Abstract method for processing an element + # + # Args: + # element (Element): An element to process + # + # Returns: + # (any): An optional something to be returned + # for every element successfully processed + # + # + def process(self, element): + pass + + # status() + # + # Abstract method for reporting the status of an element. + # + # Args: + # element (Element): An element to process + # + # Returns: + # (QueueStatus): The element status + # + def status(self, element): + return QueueStatus.READY + + # done() + # + # Abstract method for handling a successful job completion. + # + # Args: + # job (Job): The job which completed processing + # element (Element): The element which completed processing + # result (any): The return value of the process() implementation + # status (JobStatus): The return status of the Job + # + def done(self, job, element, result, status): + pass + + ##################################################### + # Scheduler / Pipeline facing APIs # + ##################################################### + + # enqueue() + # + # Enqueues some elements + # + # Args: + # elts (list): A list of Elements + # + def enqueue(self, elts): + if not elts: + return + + # Place skipped elements on the done queue right away. + # + # The remaining ready and waiting elements must remain in the + # same queue, and ready status must be determined at the moment + # which the scheduler is asking for the next job. + # + skip = [elt for elt in elts if self.status(elt) == QueueStatus.SKIP] + wait = [elt for elt in elts if elt not in skip] + + self.skipped_elements.extend(skip) # Public record of skipped elements + self._done_queue.extend(skip) # Elements to be processed + self._wait_queue.extend(wait) # Elements eligible to be dequeued + + # dequeue() + # + # A generator which dequeues the elements which + # are ready to exit the queue. + # + # Yields: + # (Element): Elements being dequeued + # + def dequeue(self): + while self._done_queue: + yield self._done_queue.popleft() + + # dequeue_ready() + # + # Reports whether any elements can be promoted to other queues + # + # Returns: + # (bool): Whether there are elements ready + # + def dequeue_ready(self): + return any(self._done_queue) + + # harvest_jobs() + # + # Process elements in the queue, moving elements which were enqueued + # into the dequeue pool, and creating as many jobs for which resources + # can be reserved. + # + # Returns: + # ([Job]): A list of jobs which can be run now + # + def harvest_jobs(self): + unready = [] + ready = [] + + while self._wait_queue: + if not self._resources.reserve(self.resources, peek=True): + break + + element = self._wait_queue.popleft() + status = self.status(element) + + if status == QueueStatus.WAIT: + unready.append(element) + elif status == QueueStatus.SKIP: + self._done_queue.append(element) + self.skipped_elements.append(element) + else: + reserved = self._resources.reserve(self.resources) + assert reserved + ready.append(element) + + self._wait_queue.extendleft(unready) + + return [ + ElementJob(self._scheduler, self.action_name, + self._element_log_path(element), + element=element, queue=self, + action_cb=self.process, + complete_cb=self._job_done, + max_retries=self._max_retries) + for element in ready + ] + + ##################################################### + # Private Methods # + ##################################################### + + # _update_workspaces() + # + # Updates and possibly saves the workspaces in the + # main data model in the main process after a job completes. + # + # Args: + # element (Element): The element which completed + # job (Job): The job which completed + # + def _update_workspaces(self, element, job): + workspace_dict = None + if job.child_data: + workspace_dict = job.child_data.get('workspace', None) + + # Handle any workspace modifications now + # + if workspace_dict: + context = element._get_context() + workspaces = context.get_workspaces() + if workspaces.update_workspace(element._get_full_name(), workspace_dict): + try: + workspaces.save_config() + except BstError as e: + self._message(element, MessageType.ERROR, "Error saving workspaces", detail=str(e)) + except Exception as e: # pylint: disable=broad-except + self._message(element, MessageType.BUG, + "Unhandled exception while saving workspaces", + detail=traceback.format_exc()) + + # _job_done() + # + # A callback reported by the Job() when a job completes + # + # This will call the Queue implementation specific Queue.done() + # implementation and trigger the scheduler to reschedule. + # + # See the Job object for an explanation of the call signature + # + def _job_done(self, job, element, status, result): + + # Now release the resources we reserved + # + self._resources.release(self.resources) + + # Update values that need to be synchronized in the main task + # before calling any queue implementation + self._update_workspaces(element, job) + + # Give the result of the job to the Queue implementor, + # and determine if it should be considered as processed + # or skipped. + try: + self.done(job, element, result, status) + except BstError as e: + + # Report error and mark as failed + # + self._message(element, MessageType.ERROR, "Post processing error", detail=str(e)) + self.failed_elements.append(element) + + # Treat this as a task error as it's related to a task + # even though it did not occur in the task context + # + # This just allows us stronger testing capability + # + set_last_task_error(e.domain, e.reason) + + except Exception as e: # pylint: disable=broad-except + + # Report unhandled exceptions and mark as failed + # + self._message(element, MessageType.BUG, + "Unhandled exception in post processing", + detail=traceback.format_exc()) + self.failed_elements.append(element) + else: + # All elements get placed on the done queue for later processing. + self._done_queue.append(element) + + # These lists are for bookkeeping purposes for the UI and logging. + if status == JobStatus.SKIPPED or job.get_terminated(): + self.skipped_elements.append(element) + elif status == JobStatus.OK: + self.processed_elements.append(element) + else: + self.failed_elements.append(element) + + # Convenience wrapper for Queue implementations to send + # a message for the element they are processing + def _message(self, element, message_type, brief, **kwargs): + context = element._get_context() + message = Message(element._unique_id, message_type, brief, **kwargs) + context.message(message) + + def _element_log_path(self, element): + project = element._get_project() + key = element._get_display_key()[1] + action = self.action_name.lower() + logfile = "{key}-{action}".format(key=key, action=action) + + return os.path.join(project.name, element.normal_name, logfile) diff --git a/src/buildstream/_scheduler/queues/sourcepushqueue.py b/src/buildstream/_scheduler/queues/sourcepushqueue.py new file mode 100644 index 000000000..c38460e6a --- /dev/null +++ b/src/buildstream/_scheduler/queues/sourcepushqueue.py @@ -0,0 +1,42 @@ +# +# Copyright (C) 2019 Bloomberg Finance LP +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. +# +# Authors: +# Raoul Hidalgo Charman <raoul.hidalgocharman@codethink.co.uk> + +from . import Queue, QueueStatus +from ..resources import ResourceType +from ..._exceptions import SkipJob + + +# A queue which pushes staged sources +# +class SourcePushQueue(Queue): + + action_name = "Src-push" + complete_name = "Sources pushed" + resources = [ResourceType.UPLOAD] + + def process(self, element): + # Returns whether a source was pushed or not + if not element._source_push(): + raise SkipJob(self.action_name) + + def status(self, element): + if element._skip_source_push(): + return QueueStatus.SKIP + + return QueueStatus.READY diff --git a/src/buildstream/_scheduler/queues/trackqueue.py b/src/buildstream/_scheduler/queues/trackqueue.py new file mode 100644 index 000000000..72a79a532 --- /dev/null +++ b/src/buildstream/_scheduler/queues/trackqueue.py @@ -0,0 +1,62 @@ +# +# Copyright (C) 2016 Codethink Limited +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. +# +# Authors: +# Tristan Van Berkom <tristan.vanberkom@codethink.co.uk> +# Jürg Billeter <juerg.billeter@codethink.co.uk> + +# BuildStream toplevel imports +from ...plugin import Plugin + +# Local imports +from . import Queue, QueueStatus +from ..resources import ResourceType +from ..jobs import JobStatus + + +# A queue which tracks sources +# +class TrackQueue(Queue): + + action_name = "Track" + complete_name = "Tracked" + resources = [ResourceType.DOWNLOAD] + + def process(self, element): + return element._track() + + def status(self, element): + # We can skip elements entirely if they have no sources. + if not list(element.sources()): + + # But we still have to mark them as tracked + element._tracking_done() + return QueueStatus.SKIP + + return QueueStatus.READY + + def done(self, _, element, result, status): + + if status == JobStatus.FAIL: + return + + # Set the new refs in the main process one by one as they complete, + # writing to bst files this time + for unique_id, new_ref in result: + source = Plugin._lookup(unique_id) + source._set_ref(new_ref, save=True) + + element._tracking_done() diff --git a/src/buildstream/_scheduler/resources.py b/src/buildstream/_scheduler/resources.py new file mode 100644 index 000000000..73bf66b4a --- /dev/null +++ b/src/buildstream/_scheduler/resources.py @@ -0,0 +1,166 @@ +class ResourceType(): + CACHE = 0 + DOWNLOAD = 1 + PROCESS = 2 + UPLOAD = 3 + + +class Resources(): + def __init__(self, num_builders, num_fetchers, num_pushers): + self._max_resources = { + ResourceType.CACHE: 0, + ResourceType.DOWNLOAD: num_fetchers, + ResourceType.PROCESS: num_builders, + ResourceType.UPLOAD: num_pushers + } + + # Resources jobs are currently using. + self._used_resources = { + ResourceType.CACHE: 0, + ResourceType.DOWNLOAD: 0, + ResourceType.PROCESS: 0, + ResourceType.UPLOAD: 0 + } + + # Resources jobs currently want exclusive access to. The set + # of jobs that have asked for exclusive access is the value - + # this is so that we can avoid scheduling any other jobs until + # *all* exclusive jobs that "register interest" have finished + # - which avoids starving them of scheduling time. + self._exclusive_resources = { + ResourceType.CACHE: set(), + ResourceType.DOWNLOAD: set(), + ResourceType.PROCESS: set(), + ResourceType.UPLOAD: set() + } + + # reserve() + # + # Reserves a set of resources + # + # Args: + # resources (set): A set of ResourceTypes + # exclusive (set): Another set of ResourceTypes + # peek (bool): Whether to only peek at whether the resource is available + # + # Returns: + # (bool): True if the resources could be reserved + # + def reserve(self, resources, exclusive=None, *, peek=False): + if exclusive is None: + exclusive = set() + + resources = set(resources) + exclusive = set(exclusive) + + # First, we check if the job wants to access a resource that + # another job wants exclusive access to. If so, it cannot be + # scheduled. + # + # Note that if *both* jobs want this exclusively, we don't + # fail yet. + # + # FIXME: I *think* we can deadlock if two jobs want disjoint + # sets of exclusive and non-exclusive resources. This + # is currently not possible, but may be worth thinking + # about. + # + for resource in resources - exclusive: + + # If our job wants this resource exclusively, we never + # check this, so we can get away with not (temporarily) + # removing it from the set. + if self._exclusive_resources[resource]: + return False + + # Now we check if anything is currently using any resources + # this job wants exclusively. If so, the job cannot be + # scheduled. + # + # Since jobs that use a resource exclusively are also using + # it, this means only one exclusive job can ever be scheduled + # at a time, despite being allowed to be part of the exclusive + # set. + # + for resource in exclusive: + if self._used_resources[resource] != 0: + return False + + # Finally, we check if we have enough of each resource + # available. If we don't have enough, the job cannot be + # scheduled. + for resource in resources: + if (self._max_resources[resource] > 0 and + self._used_resources[resource] >= self._max_resources[resource]): + return False + + # Now we register the fact that our job is using the resources + # it asked for, and tell the scheduler that it is allowed to + # continue. + if not peek: + for resource in resources: + self._used_resources[resource] += 1 + + return True + + # release() + # + # Release resources previously reserved with Resources.reserve() + # + # Args: + # resources (set): A set of resources to release + # + def release(self, resources): + for resource in resources: + assert self._used_resources[resource] > 0, "Scheduler resource imbalance" + self._used_resources[resource] -= 1 + + # register_exclusive_interest() + # + # Inform the resources pool that `source` has an interest in + # reserving this resource exclusively. + # + # The source parameter is used to identify the caller, it + # must be ensured to be unique for the time that the + # interest is registered. + # + # This function may be called multiple times, and subsequent + # calls will simply have no effect until clear_exclusive_interest() + # is used to clear the interest. + # + # This must be called in advance of reserve() + # + # Args: + # resources (set): Set of resources to reserve exclusively + # source (any): Source identifier, to be used again when unregistering + # the interest. + # + def register_exclusive_interest(self, resources, source): + + # The very first thing we do is to register any exclusive + # resources this job may want. Even if the job is not yet + # allowed to run (because another job is holding the resource + # it wants), we can still set this - it just means that any + # job *currently* using these resources has to finish first, + # and no new jobs wanting these can be launched (except other + # exclusive-access jobs). + # + for resource in resources: + self._exclusive_resources[resource].add(source) + + # unregister_exclusive_interest() + # + # Clear the exclusive interest in these resources. + # + # This should be called by the given source which registered + # an exclusive interest. + # + # Args: + # resources (set): Set of resources to reserve exclusively + # source (str): Source identifier, to be used again when unregistering + # the interest. + # + def unregister_exclusive_interest(self, resources, source): + + for resource in resources: + self._exclusive_resources[resource].discard(source) diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py new file mode 100644 index 000000000..50ad7f07a --- /dev/null +++ b/src/buildstream/_scheduler/scheduler.py @@ -0,0 +1,602 @@ +# +# Copyright (C) 2016 Codethink Limited +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. +# +# Authors: +# Tristan Van Berkom <tristan.vanberkom@codethink.co.uk> +# Jürg Billeter <juerg.billeter@codethink.co.uk> + +# System imports +import os +import asyncio +from itertools import chain +import signal +import datetime +from contextlib import contextmanager + +# Local imports +from .resources import Resources, ResourceType +from .jobs import JobStatus, CacheSizeJob, CleanupJob +from .._profile import Topics, PROFILER + + +# A decent return code for Scheduler.run() +class SchedStatus(): + SUCCESS = 0 + ERROR = -1 + TERMINATED = 1 + + +# Some action names for the internal jobs we launch +# +_ACTION_NAME_CLEANUP = 'clean' +_ACTION_NAME_CACHE_SIZE = 'size' + + +# Scheduler() +# +# The scheduler operates on a list queues, each of which is meant to accomplish +# a specific task. Elements enter the first queue when Scheduler.run() is called +# and into the next queue when complete. Scheduler.run() returns when all of the +# elements have been traversed or when an error occurs. +# +# Using the scheduler is a matter of: +# a.) Deriving the Queue class and implementing its abstract methods +# b.) Instantiating a Scheduler with one or more queues +# c.) Calling Scheduler.run(elements) with a list of elements +# d.) Fetching results from your queues +# +# Args: +# context: The Context in the parent scheduling process +# start_time: The time at which the session started +# interrupt_callback: A callback to handle ^C +# ticker_callback: A callback call once per second +# job_start_callback: A callback call when each job starts +# job_complete_callback: A callback call when each job completes +# +class Scheduler(): + + def __init__(self, context, + start_time, + interrupt_callback=None, + ticker_callback=None, + job_start_callback=None, + job_complete_callback=None): + + # + # Public members + # + self.queues = None # Exposed for the frontend to print summaries + self.context = context # The Context object shared with Queues + self.terminated = False # Whether the scheduler was asked to terminate or has terminated + self.suspended = False # Whether the scheduler is currently suspended + + # These are shared with the Job, but should probably be removed or made private in some way. + self.loop = None # Shared for Job access to observe the message queue + self.internal_stops = 0 # Amount of SIGSTP signals we've introduced, this is shared with job.py + + # + # Private members + # + self._active_jobs = [] # Jobs currently being run in the scheduler + self._starttime = start_time # Initial application start time + self._suspendtime = None # Session time compensation for suspended state + self._queue_jobs = True # Whether we should continue to queue jobs + + # State of cache management related jobs + self._cache_size_scheduled = False # Whether we have a cache size job scheduled + self._cache_size_running = None # A running CacheSizeJob, or None + self._cleanup_scheduled = False # Whether we have a cleanup job scheduled + self._cleanup_running = None # A running CleanupJob, or None + + # Callbacks to report back to the Scheduler owner + self._interrupt_callback = interrupt_callback + self._ticker_callback = ticker_callback + self._job_start_callback = job_start_callback + self._job_complete_callback = job_complete_callback + + # Whether our exclusive jobs, like 'cleanup' are currently already + # waiting or active. + # + # This is just a bit quicker than scanning the wait queue and active + # queue and comparing job action names. + # + self._exclusive_waiting = set() + self._exclusive_active = set() + + self.resources = Resources(context.sched_builders, + context.sched_fetchers, + context.sched_pushers) + + # run() + # + # Args: + # queues (list): A list of Queue objects + # + # Returns: + # (timedelta): The amount of time since the start of the session, + # discounting any time spent while jobs were suspended + # (SchedStatus): How the scheduling terminated + # + # Elements in the 'plan' will be processed by each + # queue in order. Processing will complete when all + # elements have been processed by each queue or when + # an error arises + # + def run(self, queues): + + # Hold on to the queues to process + self.queues = queues + + # Ensure that we have a fresh new event loop, in case we want + # to run another test in this thread. + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + + # Add timeouts + if self._ticker_callback: + self.loop.call_later(1, self._tick) + + # Handle unix signals while running + self._connect_signals() + + # Check if we need to start with some cache maintenance + self._check_cache_management() + + # Start the profiler + with PROFILER.profile(Topics.SCHEDULER, "_".join(queue.action_name for queue in self.queues)): + # Run the queues + self._sched() + self.loop.run_forever() + self.loop.close() + + # Stop handling unix signals + self._disconnect_signals() + + failed = any(any(queue.failed_elements) for queue in self.queues) + self.loop = None + + if failed: + status = SchedStatus.ERROR + elif self.terminated: + status = SchedStatus.TERMINATED + else: + status = SchedStatus.SUCCESS + + return self.elapsed_time(), status + + # terminate_jobs() + # + # Forcefully terminates all ongoing jobs. + # + # For this to be effective, one needs to return to + # the scheduler loop first and allow the scheduler + # to complete gracefully. + # + # NOTE: This will block SIGINT so that graceful process + # termination is not interrupted, and SIGINT will + # remain blocked after Scheduler.run() returns. + # + def terminate_jobs(self): + + # Set this right away, the frontend will check this + # attribute to decide whether or not to print status info + # etc and the following code block will trigger some callbacks. + self.terminated = True + self.loop.call_soon(self._terminate_jobs_real) + + # Block this until we're finished terminating jobs, + # this will remain blocked forever. + signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGINT]) + + # jobs_suspended() + # + # A context manager for running with jobs suspended + # + @contextmanager + def jobs_suspended(self): + self._disconnect_signals() + self._suspend_jobs() + + yield + + self._resume_jobs() + self._connect_signals() + + # stop_queueing() + # + # Stop queueing additional jobs, causes Scheduler.run() + # to return once all currently processing jobs are finished. + # + def stop_queueing(self): + self._queue_jobs = False + + # elapsed_time() + # + # Fetches the current session elapsed time + # + # Returns: + # (timedelta): The amount of time since the start of the session, + # discounting any time spent while jobs were suspended. + # + def elapsed_time(self): + timenow = datetime.datetime.now() + starttime = self._starttime + if not starttime: + starttime = timenow + return timenow - starttime + + # job_completed(): + # + # Called when a Job completes + # + # Args: + # queue (Queue): The Queue holding a complete job + # job (Job): The completed Job + # status (JobStatus): The status of the completed job + # + def job_completed(self, job, status): + + # Remove from the active jobs list + self._active_jobs.remove(job) + + # Scheduler owner facing callback + self._job_complete_callback(job, status) + + # Now check for more jobs + self._sched() + + # check_cache_size(): + # + # Queues a cache size calculation job, after the cache + # size is calculated, a cleanup job will be run automatically + # if needed. + # + def check_cache_size(self): + + # Here we assume we are called in response to a job + # completion callback, or before entering the scheduler. + # + # As such there is no need to call `_sched()` from here, + # and we prefer to run it once at the last moment. + # + self._cache_size_scheduled = True + + ####################################################### + # Local Private Methods # + ####################################################### + + # _check_cache_management() + # + # Run an initial check if we need to lock the cache + # resource and check the size and possibly launch + # a cleanup. + # + # Sessions which do not add to the cache are not affected. + # + def _check_cache_management(self): + + # Only trigger the check for a scheduler run which has + # queues which require the CACHE resource. + if not any(q for q in self.queues + if ResourceType.CACHE in q.resources): + return + + # If the estimated size outgrows the quota, queue a job to + # actually check the real cache size initially, this one + # should have exclusive access to the cache to ensure nothing + # starts while we are checking the cache. + # + artifacts = self.context.artifactcache + if artifacts.full(): + self._sched_cache_size_job(exclusive=True) + + # _spawn_job() + # + # Spanws a job + # + # Args: + # job (Job): The job to spawn + # + def _spawn_job(self, job): + self._active_jobs.append(job) + if self._job_start_callback: + self._job_start_callback(job) + job.spawn() + + # Callback for the cache size job + def _cache_size_job_complete(self, status, cache_size): + + # Deallocate cache size job resources + self._cache_size_running = None + self.resources.release([ResourceType.CACHE, ResourceType.PROCESS]) + + # Unregister the exclusive interest if there was any + self.resources.unregister_exclusive_interest( + [ResourceType.CACHE], 'cache-size' + ) + + # Schedule a cleanup job if we've hit the threshold + if status != JobStatus.OK: + return + + context = self.context + artifacts = context.artifactcache + + if artifacts.full(): + self._cleanup_scheduled = True + + # Callback for the cleanup job + def _cleanup_job_complete(self, status, cache_size): + + # Deallocate cleanup job resources + self._cleanup_running = None + self.resources.release([ResourceType.CACHE, ResourceType.PROCESS]) + + # Unregister the exclusive interest when we're done with it + if not self._cleanup_scheduled: + self.resources.unregister_exclusive_interest( + [ResourceType.CACHE], 'cache-cleanup' + ) + + # _sched_cleanup_job() + # + # Runs a cleanup job if one is scheduled to run now and + # sufficient recources are available. + # + def _sched_cleanup_job(self): + + if self._cleanup_scheduled and self._cleanup_running is None: + + # Ensure we have an exclusive interest in the resources + self.resources.register_exclusive_interest( + [ResourceType.CACHE], 'cache-cleanup' + ) + + if self.resources.reserve([ResourceType.CACHE, ResourceType.PROCESS], + [ResourceType.CACHE]): + + # Update state and launch + self._cleanup_scheduled = False + self._cleanup_running = \ + CleanupJob(self, _ACTION_NAME_CLEANUP, 'cleanup/cleanup', + complete_cb=self._cleanup_job_complete) + self._spawn_job(self._cleanup_running) + + # _sched_cache_size_job() + # + # Runs a cache size job if one is scheduled to run now and + # sufficient recources are available. + # + # Args: + # exclusive (bool): Run a cache size job immediately and + # hold the ResourceType.CACHE resource + # exclusively (used at startup). + # + def _sched_cache_size_job(self, *, exclusive=False): + + # The exclusive argument is not intended (or safe) for arbitrary use. + if exclusive: + assert not self._cache_size_scheduled + assert not self._cache_size_running + assert not self._active_jobs + self._cache_size_scheduled = True + + if self._cache_size_scheduled and not self._cache_size_running: + + # Handle the exclusive launch + exclusive_resources = set() + if exclusive: + exclusive_resources.add(ResourceType.CACHE) + self.resources.register_exclusive_interest( + exclusive_resources, 'cache-size' + ) + + # Reserve the resources (with the possible exclusive cache resource) + if self.resources.reserve([ResourceType.CACHE, ResourceType.PROCESS], + exclusive_resources): + + # Update state and launch + self._cache_size_scheduled = False + self._cache_size_running = \ + CacheSizeJob(self, _ACTION_NAME_CACHE_SIZE, + 'cache_size/cache_size', + complete_cb=self._cache_size_job_complete) + self._spawn_job(self._cache_size_running) + + # _sched_queue_jobs() + # + # Ask the queues what jobs they want to schedule and schedule + # them. This is done here so we can ask for new jobs when jobs + # from previous queues become available. + # + # This will process the Queues, pull elements through the Queues + # and process anything that is ready. + # + def _sched_queue_jobs(self): + ready = [] + process_queues = True + + while self._queue_jobs and process_queues: + + # Pull elements forward through queues + elements = [] + for queue in self.queues: + queue.enqueue(elements) + elements = list(queue.dequeue()) + + # Kickoff whatever processes can be processed at this time + # + # We start by queuing from the last queue first, because + # we want to give priority to queues later in the + # scheduling process in the case that multiple queues + # share the same token type. + # + # This avoids starvation situations where we dont move on + # to fetch tasks for elements which failed to pull, and + # thus need all the pulls to complete before ever starting + # a build + ready.extend(chain.from_iterable( + q.harvest_jobs() for q in reversed(self.queues) + )) + + # harvest_jobs() may have decided to skip some jobs, making + # them eligible for promotion to the next queue as a side effect. + # + # If that happens, do another round. + process_queues = any(q.dequeue_ready() for q in self.queues) + + # Spawn the jobs + # + for job in ready: + self._spawn_job(job) + + # _sched() + # + # Run any jobs which are ready to run, or quit the main loop + # when nothing is running or is ready to run. + # + # This is the main driving function of the scheduler, it is called + # initially when we enter Scheduler.run(), and at the end of whenever + # any job completes, after any bussiness logic has occurred and before + # going back to sleep. + # + def _sched(self): + + if not self.terminated: + + # + # Try the cache management jobs + # + self._sched_cleanup_job() + self._sched_cache_size_job() + + # + # Run as many jobs as the queues can handle for the + # available resources + # + self._sched_queue_jobs() + + # + # If nothing is ticking then bail out + # + if not self._active_jobs: + self.loop.stop() + + # _suspend_jobs() + # + # Suspend all ongoing jobs. + # + def _suspend_jobs(self): + if not self.suspended: + self._suspendtime = datetime.datetime.now() + self.suspended = True + for job in self._active_jobs: + job.suspend() + + # _resume_jobs() + # + # Resume suspended jobs. + # + def _resume_jobs(self): + if self.suspended: + for job in self._active_jobs: + job.resume() + self.suspended = False + self._starttime += (datetime.datetime.now() - self._suspendtime) + self._suspendtime = None + + # _interrupt_event(): + # + # A loop registered event callback for keyboard interrupts + # + def _interrupt_event(self): + + # FIXME: This should not be needed, but for some reason we receive an + # additional SIGINT event when the user hits ^C a second time + # to inform us that they really intend to terminate; even though + # we have disconnected our handlers at this time. + # + if self.terminated: + return + + # Leave this to the frontend to decide, if no + # interrrupt callback was specified, then just terminate. + if self._interrupt_callback: + self._interrupt_callback() + else: + # Default without a frontend is just terminate + self.terminate_jobs() + + # _terminate_event(): + # + # A loop registered event callback for SIGTERM + # + def _terminate_event(self): + self.terminate_jobs() + + # _suspend_event(): + # + # A loop registered event callback for SIGTSTP + # + def _suspend_event(self): + + # Ignore the feedback signals from Job.suspend() + if self.internal_stops: + self.internal_stops -= 1 + return + + # No need to care if jobs were suspended or not, we _only_ handle this + # while we know jobs are not suspended. + self._suspend_jobs() + os.kill(os.getpid(), signal.SIGSTOP) + self._resume_jobs() + + # _connect_signals(): + # + # Connects our signal handler event callbacks to the mainloop + # + def _connect_signals(self): + self.loop.add_signal_handler(signal.SIGINT, self._interrupt_event) + self.loop.add_signal_handler(signal.SIGTERM, self._terminate_event) + self.loop.add_signal_handler(signal.SIGTSTP, self._suspend_event) + + def _disconnect_signals(self): + self.loop.remove_signal_handler(signal.SIGINT) + self.loop.remove_signal_handler(signal.SIGTSTP) + self.loop.remove_signal_handler(signal.SIGTERM) + + def _terminate_jobs_real(self): + # 20 seconds is a long time, it can take a while and sometimes + # we still fail, need to look deeper into this again. + wait_start = datetime.datetime.now() + wait_limit = 20.0 + + # First tell all jobs to terminate + for job in self._active_jobs: + job.terminate() + + # Now wait for them to really terminate + for job in self._active_jobs: + elapsed = datetime.datetime.now() - wait_start + timeout = max(wait_limit - elapsed.total_seconds(), 0.0) + if not job.terminate_wait(timeout): + job.kill() + + # Regular timeout for driving status in the UI + def _tick(self): + elapsed = self.elapsed_time() + self._ticker_callback(elapsed) + self.loop.call_later(1, self._tick) |