diff options
author | Jürg Billeter <j@bitron.ch> | 2018-01-23 10:54:08 +0000 |
---|---|---|
committer | Jürg Billeter <j@bitron.ch> | 2018-01-23 12:03:37 +0000 |
commit | 68cef134a22f1df01f0c33c024a99555319f86df (patch) | |
tree | 22997885b961876b08a435b43f0c9e25313b548a | |
parent | 4df5d035f62ecee62d679323e4e770625957827c (diff) | |
download | buildstream-68cef134a22f1df01f0c33c024a99555319f86df.tar.gz |
_scheduler: Replace skip() and ready() with status()
Queue sometimes called ready() before skip() and sometimes it didn't.
This change consolidates the two methods to ensure we always determine
the correct status.
-rw-r--r-- | buildstream/_scheduler/__init__.py | 2 | ||||
-rw-r--r-- | buildstream/_scheduler/buildqueue.py | 13 | ||||
-rw-r--r-- | buildstream/_scheduler/fetchqueue.py | 11 | ||||
-rw-r--r-- | buildstream/_scheduler/pullqueue.py | 14 | ||||
-rw-r--r-- | buildstream/_scheduler/pushqueue.py | 9 | ||||
-rw-r--r-- | buildstream/_scheduler/queue.py | 47 | ||||
-rw-r--r-- | buildstream/_scheduler/trackqueue.py | 9 |
7 files changed, 59 insertions, 46 deletions
diff --git a/buildstream/_scheduler/__init__.py b/buildstream/_scheduler/__init__.py index 1ad6ec79b..14cdebf8e 100644 --- a/buildstream/_scheduler/__init__.py +++ b/buildstream/_scheduler/__init__.py @@ -18,7 +18,7 @@ # Authors: # Tristan Van Berkom <tristan.vanberkom@codethink.co.uk> -from .queue import Queue, QueueType +from .queue import Queue, QueueStatus, QueueType from .fetchqueue import FetchQueue from .trackqueue import TrackQueue diff --git a/buildstream/_scheduler/buildqueue.py b/buildstream/_scheduler/buildqueue.py index 6d1857313..74a7544c0 100644 --- a/buildstream/_scheduler/buildqueue.py +++ b/buildstream/_scheduler/buildqueue.py @@ -19,7 +19,7 @@ # Tristan Van Berkom <tristan.vanberkom@codethink.co.uk> # Jürg Billeter <juerg.billeter@codethink.co.uk> -from . import Queue, QueueType +from . import Queue, QueueStatus, QueueType # A queue which assembles elements @@ -34,14 +34,17 @@ class BuildQueue(Queue): element._assemble() return element._get_unique_id() - def ready(self, element): + def status(self, element): # state of dependencies may have changed, recalculate element state element._update_state() - return element._buildable() + if element._cached(): + return QueueStatus.SKIP - def skip(self, element): - return element._cached() + if not element._buildable(): + return QueueStatus.WAIT + + return QueueStatus.READY def done(self, element, result, returncode): # Elements are cached after they are successfully assembled diff --git a/buildstream/_scheduler/fetchqueue.py b/buildstream/_scheduler/fetchqueue.py index b284d5b35..ebc1fa17a 100644 --- a/buildstream/_scheduler/fetchqueue.py +++ b/buildstream/_scheduler/fetchqueue.py @@ -23,7 +23,7 @@ from .. import Consistency # Local imports -from . import Queue, QueueType +from . import Queue, QueueStatus, QueueType # A queue which fetches element sources @@ -43,14 +43,17 @@ class FetchQueue(Queue): for source in element.sources(): source._fetch() - def skip(self, element): + def status(self, element): # Optionally skip elements that are already in the artifact cache if self.skip_cached and element._cached(): - return True + return QueueStatus.SKIP # This will automatically skip elements which # have no sources. - return element._consistency() == Consistency.CACHED + if element._consistency() == Consistency.CACHED: + return QueueStatus.SKIP + + return QueueStatus.READY def done(self, element, result, returncode): diff --git a/buildstream/_scheduler/pullqueue.py b/buildstream/_scheduler/pullqueue.py index b8091e24d..3ab5ab7f4 100644 --- a/buildstream/_scheduler/pullqueue.py +++ b/buildstream/_scheduler/pullqueue.py @@ -23,7 +23,7 @@ from ..element import _KeyStrength # Local imports -from . import Queue, QueueType +from . import Queue, QueueStatus, QueueType # A queue which pulls element artifacts @@ -38,20 +38,20 @@ class PullQueue(Queue): # returns whether an artifact was downloaded or not return element._pull() - def skip(self, element): + def status(self, element): if element._cached(strength=_KeyStrength.STRONG): - return True + return QueueStatus.SKIP elif element._remotely_cached(strength=_KeyStrength.STRONG): # pull artifact using strong key - return False + return QueueStatus.READY elif element._cached(): - return True + return QueueStatus.SKIP elif element._remotely_cached(): # pull artifact using weak key - return False + return QueueStatus.READY else: # nothing to pull - return True + return QueueStatus.SKIP def done(self, element, result, returncode): diff --git a/buildstream/_scheduler/pushqueue.py b/buildstream/_scheduler/pushqueue.py index 6ffaf7949..6c7c95094 100644 --- a/buildstream/_scheduler/pushqueue.py +++ b/buildstream/_scheduler/pushqueue.py @@ -20,7 +20,7 @@ # Jürg Billeter <juerg.billeter@codethink.co.uk> # Local imports -from . import Queue, QueueType +from . import Queue, QueueStatus, QueueType # A queue which pushes element artifacts @@ -35,8 +35,11 @@ class PushQueue(Queue): # returns whether an artifact was uploaded or not return element._push() - def skip(self, element): - return element._skip_push() + def status(self, element): + if element._skip_push(): + return QueueStatus.SKIP + + return QueueStatus.READY def done(self, element, result, returncode): diff --git a/buildstream/_scheduler/queue.py b/buildstream/_scheduler/queue.py index 4642edaa7..46640677f 100644 --- a/buildstream/_scheduler/queue.py +++ b/buildstream/_scheduler/queue.py @@ -21,6 +21,7 @@ # System imports from collections import deque +from enum import Enum # Local imports from .job import Job @@ -40,6 +41,20 @@ class QueueType(): PUSH = 3 +# Queue status for a given element +# +# +class QueueStatus(Enum): + # The element is waiting for dependencies. + WAIT = 1 + + # The element can skip this queue. + SKIP = 2 + + # The element is ready for processing in this queue. + READY = 3 + + # Queue() # # @@ -89,33 +104,18 @@ class Queue(): def process(self, element): pass - # ready() - # - # Abstract method for reporting whether an element - # is ready for processing in this queue or not. - # - # Args: - # element (Element): An element to process - # - # Returns: - # (bool): Whether the element is ready for processing - # - def ready(self, element): - return True - - # skip() + # status() # - # Abstract method for reporting whether an element - # can be skipped for this phase. + # Abstract method for reporting the status of an element. # # Args: # element (Element): An element to process # # Returns: - # (bool): Whether the element can be skipped + # (QueueStatus): The element status # - def skip(self, element): - return False + def status(self, element): + return QueueStatus.READY # done() # @@ -149,7 +149,7 @@ class Queue(): # Place skipped elements directly on the done queue elts = list(elts) - skip = [elt for elt in elts if self.skip(elt)] + skip = [elt for elt in elts if self.status(elt) == QueueStatus.SKIP] wait = [elt for elt in elts if elt not in skip] self.wait_queue.extend(wait) @@ -167,11 +167,12 @@ class Queue(): while len(self.wait_queue) > 0 and scheduler.get_job_token(self.queue_type): element = self.wait_queue.popleft() - if not self.ready(element): + status = self.status(element) + if status == QueueStatus.WAIT: scheduler.put_job_token(self.queue_type) unready.append(element) continue - elif self.skip(element): + elif status == QueueStatus.SKIP: scheduler.put_job_token(self.queue_type) self.done_queue.append(element) self.skipped_elements.append(element) diff --git a/buildstream/_scheduler/trackqueue.py b/buildstream/_scheduler/trackqueue.py index 3f35aff47..f78479c49 100644 --- a/buildstream/_scheduler/trackqueue.py +++ b/buildstream/_scheduler/trackqueue.py @@ -27,7 +27,7 @@ from ..plugin import _plugin_lookup from .. import _yaml # Local imports -from . import Queue, QueueType +from . import Queue, QueueStatus, QueueType # A queue which tracks sources @@ -45,9 +45,12 @@ class TrackQueue(Queue): def process(self, element): return element._track() - def skip(self, element): + def status(self, element): # We can skip elements entirely if they have no sources. - return len(list(element.sources())) == 0 + if len(list(element.sources())) == 0: + return QueueStatus.SKIP + + return QueueStatus.READY def done(self, element, result, returncode): |