From 70cd94afa9a05ef0005cf85aa0db1d795b64bbd3 Mon Sep 17 00:00:00 2001 From: Tristan Maat Date: Mon, 16 Jul 2018 14:38:25 +0100 Subject: buildstream/_scheduler/*queue.py: Move queues to a subdirectory --- buildstream/_scheduler/__init__.py | 12 +- buildstream/_scheduler/buildqueue.py | 59 ----- buildstream/_scheduler/fetchqueue.py | 79 ------- buildstream/_scheduler/pullqueue.py | 63 ----- buildstream/_scheduler/pushqueue.py | 51 ---- buildstream/_scheduler/queue.py | 355 ---------------------------- buildstream/_scheduler/queues/__init__.py | 1 + buildstream/_scheduler/queues/buildqueue.py | 59 +++++ buildstream/_scheduler/queues/fetchqueue.py | 79 +++++++ buildstream/_scheduler/queues/pullqueue.py | 63 +++++ buildstream/_scheduler/queues/pushqueue.py | 51 ++++ buildstream/_scheduler/queues/queue.py | 355 ++++++++++++++++++++++++++++ buildstream/_scheduler/queues/trackqueue.py | 76 ++++++ buildstream/_scheduler/scheduler.py | 2 +- buildstream/_scheduler/trackqueue.py | 76 ------ 15 files changed, 691 insertions(+), 690 deletions(-) delete mode 100644 buildstream/_scheduler/buildqueue.py delete mode 100644 buildstream/_scheduler/fetchqueue.py delete mode 100644 buildstream/_scheduler/pullqueue.py delete mode 100644 buildstream/_scheduler/pushqueue.py delete mode 100644 buildstream/_scheduler/queue.py create mode 100644 buildstream/_scheduler/queues/__init__.py create mode 100644 buildstream/_scheduler/queues/buildqueue.py create mode 100644 buildstream/_scheduler/queues/fetchqueue.py create mode 100644 buildstream/_scheduler/queues/pullqueue.py create mode 100644 buildstream/_scheduler/queues/pushqueue.py create mode 100644 buildstream/_scheduler/queues/queue.py create mode 100644 buildstream/_scheduler/queues/trackqueue.py delete mode 100644 buildstream/_scheduler/trackqueue.py diff --git a/buildstream/_scheduler/__init__.py b/buildstream/_scheduler/__init__.py index 80523db6f..a53a133c2 100644 --- a/buildstream/_scheduler/__init__.py +++ b/buildstream/_scheduler/__init__.py @@ -17,12 +17,12 @@ # Authors: # Tristan Van Berkom -from .queue import Queue, QueueStatus, QueueType +from .queues import Queue, QueueStatus, QueueType -from .fetchqueue import FetchQueue -from .trackqueue import TrackQueue -from .buildqueue import BuildQueue -from .pushqueue import PushQueue -from .pullqueue import PullQueue +from .queues.fetchqueue import FetchQueue +from .queues.trackqueue import TrackQueue +from .queues.buildqueue import BuildQueue +from .queues.pushqueue import PushQueue +from .queues.pullqueue import PullQueue from .scheduler import Scheduler, SchedStatus diff --git a/buildstream/_scheduler/buildqueue.py b/buildstream/_scheduler/buildqueue.py deleted file mode 100644 index 50ba312ff..000000000 --- a/buildstream/_scheduler/buildqueue.py +++ /dev/null @@ -1,59 +0,0 @@ -# -# Copyright (C) 2016 Codethink Limited -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License as published by the Free Software Foundation; either -# version 2 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library. If not, see . -# -# Authors: -# Tristan Van Berkom -# Jürg Billeter - -from . import Queue, QueueStatus, QueueType - - -# A queue which assembles elements -# -class BuildQueue(Queue): - - action_name = "Build" - complete_name = "Built" - queue_type = QueueType.BUILD - - def process(self, element): - element._assemble() - return element._get_unique_id() - - def status(self, element): - # state of dependencies may have changed, recalculate element state - element._update_state() - - if not element._is_required(): - # Artifact is not currently required but it may be requested later. - # Keep it in the queue. - return QueueStatus.WAIT - - if element._cached(): - return QueueStatus.SKIP - - if not element._buildable(): - return QueueStatus.WAIT - - return QueueStatus.READY - - def done(self, element, result, success): - - if success: - # Inform element in main process that assembly is done - element._assemble_done() - - return True diff --git a/buildstream/_scheduler/fetchqueue.py b/buildstream/_scheduler/fetchqueue.py deleted file mode 100644 index 24512bddb..000000000 --- a/buildstream/_scheduler/fetchqueue.py +++ /dev/null @@ -1,79 +0,0 @@ -# -# Copyright (C) 2016 Codethink Limited -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License as published by the Free Software Foundation; either -# version 2 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library. If not, see . -# -# Authors: -# Tristan Van Berkom -# Jürg Billeter - -# BuildStream toplevel imports -from .. import Consistency - -# Local imports -from . import Queue, QueueStatus, QueueType - - -# A queue which fetches element sources -# -class FetchQueue(Queue): - - action_name = "Fetch" - complete_name = "Fetched" - queue_type = QueueType.FETCH - - def __init__(self, scheduler, skip_cached=False): - super().__init__(scheduler) - - self._skip_cached = skip_cached - - def process(self, element): - for source in element.sources(): - source._fetch() - - def status(self, element): - # state of dependencies may have changed, recalculate element state - element._update_state() - - if not element._is_required(): - # Artifact is not currently required but it may be requested later. - # Keep it in the queue. - return QueueStatus.WAIT - - # Optionally skip elements that are already in the artifact cache - if self._skip_cached: - if not element._can_query_cache(): - return QueueStatus.WAIT - - if element._cached(): - return QueueStatus.SKIP - - # This will automatically skip elements which - # have no sources. - if element._get_consistency() == Consistency.CACHED: - return QueueStatus.SKIP - - return QueueStatus.READY - - def done(self, element, result, success): - - if not success: - return False - - element._update_state() - - # Successful fetch, we must be CACHED now - assert element._get_consistency() == Consistency.CACHED - - return True diff --git a/buildstream/_scheduler/pullqueue.py b/buildstream/_scheduler/pullqueue.py deleted file mode 100644 index b4f5b0d73..000000000 --- a/buildstream/_scheduler/pullqueue.py +++ /dev/null @@ -1,63 +0,0 @@ -# -# Copyright (C) 2016 Codethink Limited -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License as published by the Free Software Foundation; either -# version 2 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library. If not, see . -# -# Authors: -# Tristan Van Berkom -# Jürg Billeter - -# Local imports -from . import Queue, QueueStatus, QueueType - - -# A queue which pulls element artifacts -# -class PullQueue(Queue): - - action_name = "Pull" - complete_name = "Pulled" - queue_type = QueueType.FETCH - - def process(self, element): - # returns whether an artifact was downloaded or not - return element._pull() - - def status(self, element): - # state of dependencies may have changed, recalculate element state - element._update_state() - - if not element._is_required(): - # Artifact is not currently required but it may be requested later. - # Keep it in the queue. - return QueueStatus.WAIT - - if not element._can_query_cache(): - return QueueStatus.WAIT - - if element._pull_pending(): - return QueueStatus.READY - else: - return QueueStatus.SKIP - - def done(self, element, result, success): - - if not success: - return False - - element._pull_done() - - # Element._pull() returns True if it downloaded an artifact, - # here we want to appear skipped if we did not download. - return result diff --git a/buildstream/_scheduler/pushqueue.py b/buildstream/_scheduler/pushqueue.py deleted file mode 100644 index 624eefd1d..000000000 --- a/buildstream/_scheduler/pushqueue.py +++ /dev/null @@ -1,51 +0,0 @@ -# -# Copyright (C) 2016 Codethink Limited -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License as published by the Free Software Foundation; either -# version 2 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library. If not, see . -# -# Authors: -# Tristan Van Berkom -# Jürg Billeter - -# Local imports -from . import Queue, QueueStatus, QueueType - - -# A queue which pushes element artifacts -# -class PushQueue(Queue): - - action_name = "Push" - complete_name = "Pushed" - queue_type = QueueType.PUSH - - def process(self, element): - # returns whether an artifact was uploaded or not - return element._push() - - def status(self, element): - if element._skip_push(): - return QueueStatus.SKIP - - return QueueStatus.READY - - def done(self, element, result, success): - - if not success: - return False - - # Element._push() returns True if it uploaded an artifact, - # here we want to appear skipped if the remote already had - # the artifact. - return result diff --git a/buildstream/_scheduler/queue.py b/buildstream/_scheduler/queue.py deleted file mode 100644 index 15caf8348..000000000 --- a/buildstream/_scheduler/queue.py +++ /dev/null @@ -1,355 +0,0 @@ -# -# Copyright (C) 2016 Codethink Limited -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License as published by the Free Software Foundation; either -# version 2 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library. If not, see . -# -# Authors: -# Tristan Van Berkom -# Jürg Billeter - -# System imports -from collections import deque -from enum import Enum -import traceback - -# Local imports -from .job import Job - -# BuildStream toplevel imports -from .._exceptions import BstError, set_last_task_error -from .._message import Message, MessageType - - -# Indicates the kind of activity -# -# -class QueueType(): - # Tasks which download stuff from the internet - FETCH = 1 - - # CPU/Disk intensive tasks - BUILD = 2 - - # Tasks which upload stuff to the internet - PUSH = 3 - - -# Queue status for a given element -# -# -class QueueStatus(Enum): - # The element is waiting for dependencies. - WAIT = 1 - - # The element can skip this queue. - SKIP = 2 - - # The element is ready for processing in this queue. - READY = 3 - - -# Queue() -# -# Args: -# scheduler (Scheduler): The Scheduler -# -class Queue(): - - # These should be overridden on class data of of concrete Queue implementations - action_name = None - complete_name = None - queue_type = None - - def __init__(self, scheduler): - - # - # Public members - # - self.active_jobs = [] # List of active ongoing Jobs, for scheduler observation - self.failed_elements = [] # List of failed elements, for the frontend - self.processed_elements = [] # List of processed elements, for the frontend - self.skipped_elements = [] # List of skipped elements, for the frontend - - # - # Private members - # - self._scheduler = scheduler - self._wait_queue = deque() - self._done_queue = deque() - self._max_retries = 0 - if self.queue_type == QueueType.FETCH or self.queue_type == QueueType.PUSH: - self._max_retries = scheduler.context.sched_network_retries - - # Assert the subclass has setup class data - assert self.action_name is not None - assert self.complete_name is not None - assert self.queue_type is not None - - ##################################################### - # Abstract Methods for Queue implementations # - ##################################################### - - # process() - # - # Abstract method for processing an element - # - # Args: - # element (Element): An element to process - # - # Returns: - # (any): An optional something to be returned - # for every element successfully processed - # - # - def process(self, element): - pass - - # status() - # - # Abstract method for reporting the status of an element. - # - # Args: - # element (Element): An element to process - # - # Returns: - # (QueueStatus): The element status - # - def status(self, element): - return QueueStatus.READY - - # prepare() - # - # Abstract method for handling job preparation in the main process. - # - # Args: - # element (Element): The element which is scheduled - # - def prepare(self, element): - pass - - # done() - # - # Abstract method for handling a successful job completion. - # - # Args: - # element (Element): The element which completed processing - # result (any): The return value of the process() implementation - # success (bool): True if the process() implementation did not - # raise any exception - # - # Returns: - # (bool): True if the element should appear to be processsed, - # Otherwise False will count the element as "skipped" - # - def done(self, element, result, success): - pass - - ##################################################### - # Scheduler / Pipeline facing APIs # - ##################################################### - - # enqueue() - # - # Enqueues some elements - # - # Args: - # elts (list): A list of Elements - # - def enqueue(self, elts): - if not elts: - return - - # Place skipped elements directly on the done queue - elts = list(elts) - skip = [elt for elt in elts if self.status(elt) == QueueStatus.SKIP] - wait = [elt for elt in elts if elt not in skip] - - self._wait_queue.extend(wait) - self._done_queue.extend(skip) - self.skipped_elements.extend(skip) - - # dequeue() - # - # A generator which dequeues the elements which - # are ready to exit the queue. - # - # Yields: - # (Element): Elements being dequeued - # - def dequeue(self): - while self._done_queue: - yield self._done_queue.popleft() - - # dequeue_ready() - # - # Reports whether there are any elements to dequeue - # - # Returns: - # (bool): Whether there are elements to dequeue - # - def dequeue_ready(self): - return any(self._done_queue) - - # process_ready() - # - # Process elements in the queue, moving elements which were enqueued - # into the dequeue pool, and processing them if necessary. - # - # This will have different results for elements depending - # on the Queue.status() implementation. - # - # o Elements which are QueueStatus.WAIT will not be effected - # - # o Elements which are QueueStatus.READY will be processed - # and added to the Queue.active_jobs list as a result, - # given that the scheduler allows the Queue enough tokens - # for the given queue's job type - # - # o Elements which are QueueStatus.SKIP will move directly - # to the dequeue pool - # - def process_ready(self): - scheduler = self._scheduler - unready = [] - - while self._wait_queue and scheduler.get_job_token(self.queue_type): - element = self._wait_queue.popleft() - - status = self.status(element) - if status == QueueStatus.WAIT: - scheduler.put_job_token(self.queue_type) - unready.append(element) - continue - elif status == QueueStatus.SKIP: - scheduler.put_job_token(self.queue_type) - self._done_queue.append(element) - self.skipped_elements.append(element) - continue - - self.prepare(element) - - job = Job(scheduler, element, self.action_name, - self.process, self._job_done, - max_retries=self._max_retries) - scheduler.job_starting(job) - - job.spawn() - self.active_jobs.append(job) - - # These were not ready but were in the beginning, give em - # first priority again next time around - self._wait_queue.extendleft(unready) - - ##################################################### - # Private Methods # - ##################################################### - - # _update_workspaces() - # - # Updates and possibly saves the workspaces in the - # main data model in the main process after a job completes. - # - # Args: - # element (Element): The element which completed - # job (Job): The job which completed - # - def _update_workspaces(self, element, job): - # Handle any workspace modifications now - # - if job.workspace_dict: - context = element._get_context() - workspaces = context.get_workspaces() - if workspaces.update_workspace(element._get_full_name(), job.workspace_dict): - try: - workspaces.save_config() - except BstError as e: - self._message(element, MessageType.ERROR, "Error saving workspaces", detail=str(e)) - except Exception as e: # pylint: disable=broad-except - self._message(element, MessageType.BUG, - "Unhandled exception while saving workspaces", - detail=traceback.format_exc()) - - # _job_done() - # - # A callback reported by the Job() when a job completes - # - # This will call the Queue implementation specific Queue.done() - # implementation and trigger the scheduler to reschedule. - # - # See the Job object for an explanation of the call signature - # - def _job_done(self, job, element, success, result): - - # Remove from our jobs - self.active_jobs.remove(job) - - # Update workspaces in the main task before calling any queue implementation - self._update_workspaces(element, job) - - # Give the result of the job to the Queue implementor, - # and determine if it should be considered as processed - # or skipped. - try: - processed = self.done(element, result, success) - - except BstError as e: - - # Report error and mark as failed - # - self._message(element, MessageType.ERROR, "Post processing error", detail=str(e)) - self.failed_elements.append(element) - - # Treat this as a task error as it's related to a task - # even though it did not occur in the task context - # - # This just allows us stronger testing capability - # - set_last_task_error(e.domain, e.reason) - - except Exception as e: # pylint: disable=broad-except - - # Report unhandled exceptions and mark as failed - # - self._message(element, MessageType.BUG, - "Unhandled exception in post processing", - detail=traceback.format_exc()) - self.failed_elements.append(element) - else: - - # No exception occured, handle the success/failure state in the normal way - # - if success: - self._done_queue.append(element) - if processed: - self.processed_elements.append(element) - else: - self.skipped_elements.append(element) - else: - self.failed_elements.append(element) - - # Give the token for this job back to the scheduler - # immediately before invoking another round of scheduling - self._scheduler.put_job_token(self.queue_type) - - # Notify frontend - self._scheduler.job_completed(self, job, success) - - self._scheduler.sched() - - # Convenience wrapper for Queue implementations to send - # a message for the element they are processing - def _message(self, element, message_type, brief, **kwargs): - context = element._get_context() - message = Message(element._get_unique_id(), message_type, brief, **kwargs) - context.message(message) diff --git a/buildstream/_scheduler/queues/__init__.py b/buildstream/_scheduler/queues/__init__.py new file mode 100644 index 000000000..b9acef18c --- /dev/null +++ b/buildstream/_scheduler/queues/__init__.py @@ -0,0 +1 @@ +from .queue import Queue, QueueStatus, QueueType diff --git a/buildstream/_scheduler/queues/buildqueue.py b/buildstream/_scheduler/queues/buildqueue.py new file mode 100644 index 000000000..50ba312ff --- /dev/null +++ b/buildstream/_scheduler/queues/buildqueue.py @@ -0,0 +1,59 @@ +# +# Copyright (C) 2016 Codethink Limited +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see . +# +# Authors: +# Tristan Van Berkom +# Jürg Billeter + +from . import Queue, QueueStatus, QueueType + + +# A queue which assembles elements +# +class BuildQueue(Queue): + + action_name = "Build" + complete_name = "Built" + queue_type = QueueType.BUILD + + def process(self, element): + element._assemble() + return element._get_unique_id() + + def status(self, element): + # state of dependencies may have changed, recalculate element state + element._update_state() + + if not element._is_required(): + # Artifact is not currently required but it may be requested later. + # Keep it in the queue. + return QueueStatus.WAIT + + if element._cached(): + return QueueStatus.SKIP + + if not element._buildable(): + return QueueStatus.WAIT + + return QueueStatus.READY + + def done(self, element, result, success): + + if success: + # Inform element in main process that assembly is done + element._assemble_done() + + return True diff --git a/buildstream/_scheduler/queues/fetchqueue.py b/buildstream/_scheduler/queues/fetchqueue.py new file mode 100644 index 000000000..bdff15667 --- /dev/null +++ b/buildstream/_scheduler/queues/fetchqueue.py @@ -0,0 +1,79 @@ +# +# Copyright (C) 2016 Codethink Limited +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see . +# +# Authors: +# Tristan Van Berkom +# Jürg Billeter + +# BuildStream toplevel imports +from ... import Consistency + +# Local imports +from . import Queue, QueueStatus, QueueType + + +# A queue which fetches element sources +# +class FetchQueue(Queue): + + action_name = "Fetch" + complete_name = "Fetched" + queue_type = QueueType.FETCH + + def __init__(self, scheduler, skip_cached=False): + super().__init__(scheduler) + + self._skip_cached = skip_cached + + def process(self, element): + for source in element.sources(): + source._fetch() + + def status(self, element): + # state of dependencies may have changed, recalculate element state + element._update_state() + + if not element._is_required(): + # Artifact is not currently required but it may be requested later. + # Keep it in the queue. + return QueueStatus.WAIT + + # Optionally skip elements that are already in the artifact cache + if self._skip_cached: + if not element._can_query_cache(): + return QueueStatus.WAIT + + if element._cached(): + return QueueStatus.SKIP + + # This will automatically skip elements which + # have no sources. + if element._get_consistency() == Consistency.CACHED: + return QueueStatus.SKIP + + return QueueStatus.READY + + def done(self, element, result, success): + + if not success: + return False + + element._update_state() + + # Successful fetch, we must be CACHED now + assert element._get_consistency() == Consistency.CACHED + + return True diff --git a/buildstream/_scheduler/queues/pullqueue.py b/buildstream/_scheduler/queues/pullqueue.py new file mode 100644 index 000000000..b4f5b0d73 --- /dev/null +++ b/buildstream/_scheduler/queues/pullqueue.py @@ -0,0 +1,63 @@ +# +# Copyright (C) 2016 Codethink Limited +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see . +# +# Authors: +# Tristan Van Berkom +# Jürg Billeter + +# Local imports +from . import Queue, QueueStatus, QueueType + + +# A queue which pulls element artifacts +# +class PullQueue(Queue): + + action_name = "Pull" + complete_name = "Pulled" + queue_type = QueueType.FETCH + + def process(self, element): + # returns whether an artifact was downloaded or not + return element._pull() + + def status(self, element): + # state of dependencies may have changed, recalculate element state + element._update_state() + + if not element._is_required(): + # Artifact is not currently required but it may be requested later. + # Keep it in the queue. + return QueueStatus.WAIT + + if not element._can_query_cache(): + return QueueStatus.WAIT + + if element._pull_pending(): + return QueueStatus.READY + else: + return QueueStatus.SKIP + + def done(self, element, result, success): + + if not success: + return False + + element._pull_done() + + # Element._pull() returns True if it downloaded an artifact, + # here we want to appear skipped if we did not download. + return result diff --git a/buildstream/_scheduler/queues/pushqueue.py b/buildstream/_scheduler/queues/pushqueue.py new file mode 100644 index 000000000..624eefd1d --- /dev/null +++ b/buildstream/_scheduler/queues/pushqueue.py @@ -0,0 +1,51 @@ +# +# Copyright (C) 2016 Codethink Limited +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see . +# +# Authors: +# Tristan Van Berkom +# Jürg Billeter + +# Local imports +from . import Queue, QueueStatus, QueueType + + +# A queue which pushes element artifacts +# +class PushQueue(Queue): + + action_name = "Push" + complete_name = "Pushed" + queue_type = QueueType.PUSH + + def process(self, element): + # returns whether an artifact was uploaded or not + return element._push() + + def status(self, element): + if element._skip_push(): + return QueueStatus.SKIP + + return QueueStatus.READY + + def done(self, element, result, success): + + if not success: + return False + + # Element._push() returns True if it uploaded an artifact, + # here we want to appear skipped if the remote already had + # the artifact. + return result diff --git a/buildstream/_scheduler/queues/queue.py b/buildstream/_scheduler/queues/queue.py new file mode 100644 index 000000000..d0c482802 --- /dev/null +++ b/buildstream/_scheduler/queues/queue.py @@ -0,0 +1,355 @@ +# +# Copyright (C) 2016 Codethink Limited +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see . +# +# Authors: +# Tristan Van Berkom +# Jürg Billeter + +# System imports +from collections import deque +from enum import Enum +import traceback + +# Local imports +from ..job import Job + +# BuildStream toplevel imports +from ..._exceptions import BstError, set_last_task_error +from ..._message import Message, MessageType + + +# Indicates the kind of activity +# +# +class QueueType(): + # Tasks which download stuff from the internet + FETCH = 1 + + # CPU/Disk intensive tasks + BUILD = 2 + + # Tasks which upload stuff to the internet + PUSH = 3 + + +# Queue status for a given element +# +# +class QueueStatus(Enum): + # The element is waiting for dependencies. + WAIT = 1 + + # The element can skip this queue. + SKIP = 2 + + # The element is ready for processing in this queue. + READY = 3 + + +# Queue() +# +# Args: +# scheduler (Scheduler): The Scheduler +# +class Queue(): + + # These should be overridden on class data of of concrete Queue implementations + action_name = None + complete_name = None + queue_type = None + + def __init__(self, scheduler): + + # + # Public members + # + self.active_jobs = [] # List of active ongoing Jobs, for scheduler observation + self.failed_elements = [] # List of failed elements, for the frontend + self.processed_elements = [] # List of processed elements, for the frontend + self.skipped_elements = [] # List of skipped elements, for the frontend + + # + # Private members + # + self._scheduler = scheduler + self._wait_queue = deque() + self._done_queue = deque() + self._max_retries = 0 + if self.queue_type == QueueType.FETCH or self.queue_type == QueueType.PUSH: + self._max_retries = scheduler.context.sched_network_retries + + # Assert the subclass has setup class data + assert self.action_name is not None + assert self.complete_name is not None + assert self.queue_type is not None + + ##################################################### + # Abstract Methods for Queue implementations # + ##################################################### + + # process() + # + # Abstract method for processing an element + # + # Args: + # element (Element): An element to process + # + # Returns: + # (any): An optional something to be returned + # for every element successfully processed + # + # + def process(self, element): + pass + + # status() + # + # Abstract method for reporting the status of an element. + # + # Args: + # element (Element): An element to process + # + # Returns: + # (QueueStatus): The element status + # + def status(self, element): + return QueueStatus.READY + + # prepare() + # + # Abstract method for handling job preparation in the main process. + # + # Args: + # element (Element): The element which is scheduled + # + def prepare(self, element): + pass + + # done() + # + # Abstract method for handling a successful job completion. + # + # Args: + # element (Element): The element which completed processing + # result (any): The return value of the process() implementation + # success (bool): True if the process() implementation did not + # raise any exception + # + # Returns: + # (bool): True if the element should appear to be processsed, + # Otherwise False will count the element as "skipped" + # + def done(self, element, result, success): + pass + + ##################################################### + # Scheduler / Pipeline facing APIs # + ##################################################### + + # enqueue() + # + # Enqueues some elements + # + # Args: + # elts (list): A list of Elements + # + def enqueue(self, elts): + if not elts: + return + + # Place skipped elements directly on the done queue + elts = list(elts) + skip = [elt for elt in elts if self.status(elt) == QueueStatus.SKIP] + wait = [elt for elt in elts if elt not in skip] + + self._wait_queue.extend(wait) + self._done_queue.extend(skip) + self.skipped_elements.extend(skip) + + # dequeue() + # + # A generator which dequeues the elements which + # are ready to exit the queue. + # + # Yields: + # (Element): Elements being dequeued + # + def dequeue(self): + while self._done_queue: + yield self._done_queue.popleft() + + # dequeue_ready() + # + # Reports whether there are any elements to dequeue + # + # Returns: + # (bool): Whether there are elements to dequeue + # + def dequeue_ready(self): + return any(self._done_queue) + + # process_ready() + # + # Process elements in the queue, moving elements which were enqueued + # into the dequeue pool, and processing them if necessary. + # + # This will have different results for elements depending + # on the Queue.status() implementation. + # + # o Elements which are QueueStatus.WAIT will not be effected + # + # o Elements which are QueueStatus.READY will be processed + # and added to the Queue.active_jobs list as a result, + # given that the scheduler allows the Queue enough tokens + # for the given queue's job type + # + # o Elements which are QueueStatus.SKIP will move directly + # to the dequeue pool + # + def process_ready(self): + scheduler = self._scheduler + unready = [] + + while self._wait_queue and scheduler.get_job_token(self.queue_type): + element = self._wait_queue.popleft() + + status = self.status(element) + if status == QueueStatus.WAIT: + scheduler.put_job_token(self.queue_type) + unready.append(element) + continue + elif status == QueueStatus.SKIP: + scheduler.put_job_token(self.queue_type) + self._done_queue.append(element) + self.skipped_elements.append(element) + continue + + self.prepare(element) + + job = Job(scheduler, element, self.action_name, + self.process, self._job_done, + max_retries=self._max_retries) + scheduler.job_starting(job) + + job.spawn() + self.active_jobs.append(job) + + # These were not ready but were in the beginning, give em + # first priority again next time around + self._wait_queue.extendleft(unready) + + ##################################################### + # Private Methods # + ##################################################### + + # _update_workspaces() + # + # Updates and possibly saves the workspaces in the + # main data model in the main process after a job completes. + # + # Args: + # element (Element): The element which completed + # job (Job): The job which completed + # + def _update_workspaces(self, element, job): + # Handle any workspace modifications now + # + if job.workspace_dict: + context = element._get_context() + workspaces = context.get_workspaces() + if workspaces.update_workspace(element._get_full_name(), job.workspace_dict): + try: + workspaces.save_config() + except BstError as e: + self._message(element, MessageType.ERROR, "Error saving workspaces", detail=str(e)) + except Exception as e: # pylint: disable=broad-except + self._message(element, MessageType.BUG, + "Unhandled exception while saving workspaces", + detail=traceback.format_exc()) + + # _job_done() + # + # A callback reported by the Job() when a job completes + # + # This will call the Queue implementation specific Queue.done() + # implementation and trigger the scheduler to reschedule. + # + # See the Job object for an explanation of the call signature + # + def _job_done(self, job, element, success, result): + + # Remove from our jobs + self.active_jobs.remove(job) + + # Update workspaces in the main task before calling any queue implementation + self._update_workspaces(element, job) + + # Give the result of the job to the Queue implementor, + # and determine if it should be considered as processed + # or skipped. + try: + processed = self.done(element, result, success) + + except BstError as e: + + # Report error and mark as failed + # + self._message(element, MessageType.ERROR, "Post processing error", detail=str(e)) + self.failed_elements.append(element) + + # Treat this as a task error as it's related to a task + # even though it did not occur in the task context + # + # This just allows us stronger testing capability + # + set_last_task_error(e.domain, e.reason) + + except Exception as e: # pylint: disable=broad-except + + # Report unhandled exceptions and mark as failed + # + self._message(element, MessageType.BUG, + "Unhandled exception in post processing", + detail=traceback.format_exc()) + self.failed_elements.append(element) + else: + + # No exception occured, handle the success/failure state in the normal way + # + if success: + self._done_queue.append(element) + if processed: + self.processed_elements.append(element) + else: + self.skipped_elements.append(element) + else: + self.failed_elements.append(element) + + # Give the token for this job back to the scheduler + # immediately before invoking another round of scheduling + self._scheduler.put_job_token(self.queue_type) + + # Notify frontend + self._scheduler.job_completed(self, job, success) + + self._scheduler.sched() + + # Convenience wrapper for Queue implementations to send + # a message for the element they are processing + def _message(self, element, message_type, brief, **kwargs): + context = element._get_context() + message = Message(element._get_unique_id(), message_type, brief, **kwargs) + context.message(message) diff --git a/buildstream/_scheduler/queues/trackqueue.py b/buildstream/_scheduler/queues/trackqueue.py new file mode 100644 index 000000000..3a65f01d0 --- /dev/null +++ b/buildstream/_scheduler/queues/trackqueue.py @@ -0,0 +1,76 @@ +# +# Copyright (C) 2016 Codethink Limited +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see . +# +# Authors: +# Tristan Van Berkom +# Jürg Billeter + +# BuildStream toplevel imports +from ...plugin import _plugin_lookup +from ... import SourceError + +# Local imports +from . import Queue, QueueStatus, QueueType + + +# A queue which tracks sources +# +class TrackQueue(Queue): + + action_name = "Track" + complete_name = "Tracked" + queue_type = QueueType.FETCH + + def process(self, element): + return element._track() + + def status(self, element): + # We can skip elements entirely if they have no sources. + if not list(element.sources()): + + # But we still have to mark them as tracked + element._tracking_done() + return QueueStatus.SKIP + + return QueueStatus.READY + + def done(self, element, result, success): + + if not success: + return False + + changed = False + + # Set the new refs in the main process one by one as they complete + for unique_id, new_ref in result: + source = _plugin_lookup(unique_id) + try: + # We appear processed if at least one source has changed + if source._save_ref(new_ref): + changed = True + except SourceError as e: + # FIXME: We currently dont have a clear path to + # fail the scheduler from the main process, so + # this will just warn and BuildStream will exit + # with a success code. + # + source.warn("Failed to update project file", + detail="{}".format(e)) + + element._tracking_done() + + # We'll appear as a skipped element if tracking resulted in no change + return changed diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py index f8a66ae92..7bfbc958e 100644 --- a/buildstream/_scheduler/scheduler.py +++ b/buildstream/_scheduler/scheduler.py @@ -26,7 +26,7 @@ import datetime from contextlib import contextmanager # Local imports -from .queue import QueueType +from .queues import QueueType # A decent return code for Scheduler.run() diff --git a/buildstream/_scheduler/trackqueue.py b/buildstream/_scheduler/trackqueue.py deleted file mode 100644 index e48e1ae28..000000000 --- a/buildstream/_scheduler/trackqueue.py +++ /dev/null @@ -1,76 +0,0 @@ -# -# Copyright (C) 2016 Codethink Limited -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License as published by the Free Software Foundation; either -# version 2 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library. If not, see . -# -# Authors: -# Tristan Van Berkom -# Jürg Billeter - -# BuildStream toplevel imports -from ..plugin import _plugin_lookup -from .. import SourceError - -# Local imports -from . import Queue, QueueStatus, QueueType - - -# A queue which tracks sources -# -class TrackQueue(Queue): - - action_name = "Track" - complete_name = "Tracked" - queue_type = QueueType.FETCH - - def process(self, element): - return element._track() - - def status(self, element): - # We can skip elements entirely if they have no sources. - if not list(element.sources()): - - # But we still have to mark them as tracked - element._tracking_done() - return QueueStatus.SKIP - - return QueueStatus.READY - - def done(self, element, result, success): - - if not success: - return False - - changed = False - - # Set the new refs in the main process one by one as they complete - for unique_id, new_ref in result: - source = _plugin_lookup(unique_id) - try: - # We appear processed if at least one source has changed - if source._save_ref(new_ref): - changed = True - except SourceError as e: - # FIXME: We currently dont have a clear path to - # fail the scheduler from the main process, so - # this will just warn and BuildStream will exit - # with a success code. - # - source.warn("Failed to update project file", - detail="{}".format(e)) - - element._tracking_done() - - # We'll appear as a skipped element if tracking resulted in no change - return changed -- cgit v1.2.1